The NVIDIA Jetson Thor represents a paradigm shift in edge computing for physical AI. Powered by the 2560 Blackwell-based CUDA cores, GPU architecture and featuring 128GB of LPDDR5X memory, Thor delivers an astounding 2,070 FP4 TFLOPS of AI compute—that’s 7.5x more AI performance and 3.5x better energy efficiency compared to Jetson AGX Orin.
Key Specifications
| Feature | Jetson Thor T5000 |
|---|---|
| GPU | Blackwell Architecture |
| AI Performance | 2070 FP4 TFLOPS |
| Memory | 128 GB LPDDR5X |
| CPU | 14-core Arm Neoverse V3AE |
| Power | Configurable 40W-130W |
| Video Encode | 6x 4Kp60 (H.265/H.264) |
| Video Decode | 4x 8Kp30 or 10x 4Kp60 |
| Cameras | Up to 20 cameras |
| Networking | 4x 25GbE |
| Price | Developer Kit: $3,499 |
Let’s look at the top 5 use cases where you can leverage NVIDIA Jetson AGX Thor:
Use Case 1: Humanoid Robot Control with Isaac GR00T N1.6
Why Thor for Humanoid Robotics?
Humanoid robots require real-time multi-modal perception, reasoning, and control—exactly what Thor was designed for. Companies like Boston Dynamics, Figure AI, and Agility Robotics are already using Thor for next-generation humanoids.
Architecture Overview
┌─────────────────────────────────────────┐
│ Vision-Language-Action (VLA) Model │
│ Isaac GR00T N1.6 │
├─────────────────────────────────────────┤
│ Multi-Sensor Input Processing │
│ • Stereo Cameras (Perception) │
│ • LiDAR (Spatial Mapping) │
│ • IMU (Balance & Orientation) │
│ • Force/Torque Sensors │
├─────────────────────────────────────────┤
│ Jetson Thor Hardware │
│ • Blackwell GPU (2070 TFLOPS) │
│ • 14-core CPU │
│ • PVA (Programmable Vision Accel) │
│ • 128GB Memory for Large Models │
└─────────────────────────────────────────┘
Step-by-Step Setup Tutorial
1. Prerequisites
# Hardware Requirements
- NVIDIA Jetson AGX Thor Developer Kit
- microSD card (64GB+) or NVMe SSD (500GB+ recommended)
- Power supply (130W)
- Internet connection
# Software Requirements
- JetPack 7.0 or later
- CUDA 13.0
- Docker with NVIDIA Container Runtime
2. Install JetPack and Flash Thor
# Download JetPack 7.0 from NVIDIA Developer Portal
# Flash using SDK Manager or Jetson USB method
# Verify installation
cat /etc/nv_tegra_release
# Expected output: R38 (release), REVISION: 2.1
# Check CUDA version
nvcc --version
# Should show CUDA 13.0
3. Setup Isaac ROS Environment
# Create workspace
mkdir -p /mnt/nova_ssd/workspaces/isaac_ros-dev/src
export ISAAC_ROS_WS="/mnt/nova_ssd/workspaces/isaac_ros-dev/"
# Add to bashrc for persistence
echo 'export ISAAC_ROS_WS="${ISAAC_ROS_WS:-/mnt/nova_ssd/workspaces/isaac_ros-dev/}"' >> ~/.bashrc
source ~/.bashrc
# Configure Isaac ROS Apt Repository
sudo apt update && sudo apt install locales
sudo locale-gen en_US en_US.UTF-8
sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8
export LANG=en_US.UTF-8
# Install Isaac ROS CLI
sudo apt-get install -y isaac-ros-cli
4. Install NVIDIA Container Toolkit
# Install prerequisites
sudo apt-get update
sudo apt-get install -y ca-certificates curl gnupg
# Add NVIDIA Container Toolkit keyring
curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | \
sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg --yes
# Add repository
echo "deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://nvidia.github.io/libnvidia-container/stable/deb/$(dpkg --print-architecture) /" | \
sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list
# Install toolkit
sudo apt-get update
sudo apt-get install -y nvidia-container-toolkit
# Configure Docker daemon
sudo nvidia-ctk runtime configure --runtime=docker
sudo systemctl restart docker
# Test Docker GPU access
docker run --rm --runtime=nvidia --gpus all nvidia/cuda:12.4.0-base-ubuntu22.04 nvidia-smi
5. Clone Isaac GR00T Repository
cd $ISAAC_ROS_WS/src
git clone https://github.com/NVIDIA/Isaac-GR00T.git
cd Isaac-GR00T
# Install dependencies
pip install uv
uv pip install -e .
6. Download Pre-trained GR00T Model
# Download GR00T N1.6-3B base model (for GR1 humanoid platform)
huggingface-cli download nvidia/GR00T-N1.6-3B --local-dir ./models/gr00t-n1.6-3b
# Or for larger 7B model (requires more VRAM)
huggingface-cli download nvidia/GR00T-N1.6-7B --local-dir ./models/gr00t-n1.6-7b
7. Run GR00T Policy Server
# gr00t/eval/run_gr00t_server.py
import torch
from gr00t.policy import Gr00tPolicy
from flask import Flask, request, jsonify
app = Flask(__name__)
# Load GR00T model
device = "cuda" if torch.cuda.is_available() else "cpu"
policy = Gr00tPolicy.from_pretrained(
"nvidia/GR00T-N1.6-3B",
device=device,
dtype=torch.float16 # Use FP16 for efficiency
)
@app.route('/predict', methods=['POST'])
def predict_action():
"""
Input: {
'images': [...], # RGB images from cameras
'proprio': [...], # Joint positions, velocities
'instruction': "pick up the red cube"
}
Output: {
'actions': [...] # Joint commands for next 16 timesteps
}
"""
data = request.json
# Prepare inputs
images = torch.tensor(data['images']).to(device)
proprio = torch.tensor(data['proprio']).to(device)
instruction = data['instruction']
# Run inference
with torch.no_grad():
actions = policy.predict(
images=images,
proprio=proprio,
instruction=instruction,
chunk_size=16 # Predict 16 timesteps ahead
)
return jsonify({'actions': actions.cpu().tolist()})
if __name__ == '__main__':
print(f"GR00T Policy Server running on {device}")
print(f"Model: GR00T N1.6-3B")
app.run(host='0.0.0.0', port=5000)
8. Run the Policy Server
# Start the server
cd $ISAAC_ROS_WS/src/Isaac-GR00T
uv run python gr00t/eval/run_gr00t_server.py --embodiment-tag GR1 --model-path nvidia/GR00T-N1.6-3B
# Expected Output:
# Loading checkpoint from nvidia/GR00T-N1.6-3B
# Model loaded on cuda:0
# GR00T Policy Server running on cuda
# * Running on http://0.0.0.0:5000
9. Client Code to Control Robot
# robot_control_client.py
import requests
import numpy as np
import cv2
class HumanoidController:
def __init__(self, server_url="http://localhost:5000"):
self.server_url = server_url
def get_action(self, camera_images, joint_states, instruction):
"""
Send sensor data to GR00T and receive actions
"""
# Prepare payload
payload = {
'images': [img.tolist() for img in camera_images],
'proprio': joint_states.tolist(),
'instruction': instruction
}
# Call policy server
response = requests.post(
f"{self.server_url}/predict",
json=payload
)
actions = np.array(response.json()['actions'])
return actions
def control_loop(self, robot_interface):
"""Main control loop"""
instruction = "walk forward and pick up the box"
while True:
# Get sensor readings
images = robot_interface.get_camera_images()
joint_states = robot_interface.get_joint_states()
# Get actions from GR00T
actions = self.get_action(images, joint_states, instruction)
# Execute first action (model predictive control)
robot_interface.execute_action(actions[0])
# Wait for next control cycle (typically 30-50 Hz)
time.sleep(0.02)
# Usage
controller = HumanoidController()
# controller.control_loop(robot_interface)
```
### **Real-World Applications**
- **Manufacturing**: Assembly line tasks, quality inspection, material handling
- **Healthcare**: Patient assistance, hospital logistics, rehabilitation support
- **Warehousing**: Order fulfillment, inventory management (Amazon, Figure AI)
- **Construction**: Site inspection, tool handling, safety monitoring
### **Performance Benchmarks**
- **Inference Latency**: ~15-30ms per control cycle (3B model)
- **Throughput**: 30-50 Hz control frequency
- **Multi-sensor Processing**: 4+ cameras @ 30 FPS simultaneously
- **Power**: 80-100W typical during operation
---
## **Use Case 2: Running Large Language Models (70B-120B) at the Edge**
### **Why Thor for Edge LLMs?**
Thor's **128GB of unified memory** enables running models that would require multiple GPUs on other platforms. With NVFP4 quantization, even 120B parameter models can run at impressive speeds.
### **Architecture**
```
┌──────────────────────────────────────┐
│ Application Layer │
│ • Chat Interface / API Server │
│ • Robot Command Interpreter │
├──────────────────────────────────────┤
│ LLM Inference Engine │
│ • vLLM / SGLang / TensorRT-LLM │
│ • KV Cache Management │
├──────────────────────────────────────┤
│ Model Serving │
│ • Quantized Model (FP4/FP8/INT4) │
│ • Llama 3.2 70B / GPT-OSS 120B │
├──────────────────────────────────────┤
│ Thor Hardware │
│ • 128GB Unified Memory │
│ • Blackwell Tensor Cores (FP4) │
└──────────────────────────────────────┘
Tutorial: Deploy Llama 3.2 Vision 70B on Thor
1. Install vLLM
# Install vLLM with CUDA support
pip3 install --break-system-packages vllm
# Verify installation
python3 -c "import vllm; print(vllm.__version__)"
2. Download Llama 3.2 Vision Model
# Install Hugging Face CLI
pip3 install --break-system-packages huggingface-hub
# Login to Hugging Face (requires Meta license approval)
huggingface-cli login
# Download model (will take some time - ~140GB)
huggingface-cli download meta-llama/Llama-3.2-Vision-70B \
--local-dir /mnt/nova_ssd/models/llama-3.2-vision-70b
3. Quantize Model to FP4 (Optional but Recommended)
# quantize_model.py
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
model_path = "/mnt/nova_ssd/models/llama-3.2-vision-70b"
output_path = "/mnt/nova_ssd/models/llama-3.2-vision-70b-fp4"
# Load model
print("Loading model...")
model = AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.float16,
device_map="auto"
)
# Quantize to FP4 using NVIDIA's NVFP4 format
print("Quantizing to FP4...")
from nv_tensorrt_llm.quantization import quantize_model
quantized_model = quantize_model(
model,
quant_config="nvfp4",
calibration_dataset="c4" # Use C4 for calibration
)
# Save quantized model
quantized_model.save_pretrained(output_path)
print(f"Quantized model saved to {output_path}")
4. Create vLLM Server
# llm_server.py
from vllm import LLM, SamplingParams
from flask import Flask, request, jsonify
import torch
app = Flask(__name__)
# Initialize vLLM with optimizations for Thor
print("Initializing vLLM...")
llm = LLM(
model="/mnt/nova_ssd/models/llama-3.2-vision-70b-fp4",
tensor_parallel_size=1, # Single GPU on Thor
gpu_memory_utilization=0.90, # Use 90% of 128GB
max_model_len=8192, # Context length
quantization="nvfp4", # Use NVIDIA FP4 format
enforce_eager=False, # Use CUDA graphs for speed
dtype="float16"
)
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.95,
max_tokens=512
)
@app.route('/generate', methods=['POST'])
def generate():
data = request.json
prompt = data.get('prompt', '')
# Generate response
outputs = llm.generate([prompt], sampling_params)
response = outputs[0].outputs[0].text
return jsonify({
'response': response,
'tokens_generated': len(outputs[0].outputs[0].token_ids),
'gpu_memory_used': torch.cuda.memory_allocated() / 1e9 # GB
})
@app.route('/health', methods=['GET'])
def health():
return jsonify({
'status': 'healthy',
'model': 'Llama-3.2-Vision-70B-FP4',
'device': 'Jetson Thor T5000',
'memory_available': torch.cuda.get_device_properties(0).total_memory / 1e9
})
if __name__ == '__main__':
print("LLM Server ready!")
app.run(host='0.0.0.0', port=8000, threaded=True)
5. Run the Server
# Start server
python3 llm_server.py
# Expected output:
# Initializing vLLM...
# INFO: Loading model weights...
# INFO: Model loaded successfully
# INFO: Using 115.2 GB / 128 GB memory
# LLM Server ready!
# * Running on http://0.0.0.0:8000
6. Test the API
# Test generation
curl -X POST http://localhost:8000/generate \
-H "Content-Type: application/json" \
-d '{
"prompt": "Explain quantum entanglement in simple terms:"
}'
# Check health
curl http://localhost:8000/health
7. Robot Control Integration
# robot_llm_controller.py
import requests
class RobotLLMController:
def __init__(self, llm_url="http://localhost:8000"):
self.llm_url = llm_url
def interpret_command(self, human_instruction):
"""Convert natural language to robot commands"""
prompt = f"""
Convert this human instruction into robot commands:
Instruction: "{human_instruction}"
Output JSON format:
{{
"action": "move" | "grasp" | "place" | "wait",
"target": "object_name",
"parameters": {{...}}
}}
JSON:"""
response = requests.post(
f"{self.llm_url}/generate",
json={'prompt': prompt}
)
return response.json()['response']
def explain_failure(self, error_data):
"""Explain robot failure to human"""
prompt = f"""
The robot encountered an error:
Error: {error_data['error_type']}
Details: {error_data['details']}
Explain this to a human operator in simple terms and suggest solutions:"""
response = requests.post(
f"{self.llm_url}/generate",
json={'prompt': prompt}
)
return response.json()['response']
# Usage
controller = RobotLLMController()
command = controller.interpret_command("Pick up the red box and put it on the shelf")
print(command)
```
### **Performance Benchmarks**
- **70B Model**: ~25-40 tokens/second with FP4 quantization
- **120B Model**: ~15-20 tokens/second with FP4 quantization
- **Memory Usage**: 70B model uses ~85GB, 120B uses ~115GB
- **Latency**: First token in ~200-500ms
- **Power**: 100-120W during inference
### **Applications**
- **Robotics**: Natural language control, task planning, error recovery
- **Edge AI**: Private chatbots for hospitals, factories, retail
- **Autonomous Vehicles**: Real-time decision making without cloud connectivity
- **Industrial**: Maintenance documentation, technical support
---
## **Use Case 3: Multi-Camera Real-Time Video Analytics with Vision Programming Interface**
### **Why Thor for Multi-Camera Systems?**
Thor can handle **up to 20 cameras simultaneously** with:
- Dual NVENC/NVDEC engines
- Programmable Vision Accelerator (PVA) for stereo processing
- Camera Offload Engine
- Holoscan Sensor Bridge for Camera over Ethernet
### **Architecture**
```
┌────────────────────────────────────────────────┐
│ Application Layer │
│ • Object Detection Dashboard │
│ • Alert System • Video Archive │
├────────────────────────────────────────────────┤
│ AI Inference Layer (on GPU) │
│ • YOLOv8 Object Detection │
│ • Person Re-ID • Activity Recognition │
├────────────────────────────────────────────────┤
│ Video Processing Pipeline (Multi-Accel) │
│ • NVDEC: Hardware decode │
│ • VIC: Scaling, color conversion │
│ • PVA: Stereo, optical flow │
│ • NVENC: Hardware encode │
├────────────────────────────────────────────────┤
│ Camera Inputs (20 streams) │
│ 4K@30fps x 20 cameras (via Holoscan) │
└────────────────────────────────────────────────┘
Tutorial: Deploy 20-Camera Surveillance System
1. Setup GStreamer Pipeline
# Install GStreamer plugins
sudo apt-get install -y \
gstreamer1.0-tools \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly \
gstreamer1.0-libav
# Verify NVDEC/NVENC availability
gst-inspect-1.0 nvv4l2decoder
gst-inspect-1.0 nvv4l2h264enc
2. Multi-Camera Capture Script
# multicam_capture.py
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import numpy as np
import cv2
import threading
Gst.init(None)
class MultiCameraSystem:
def __init__(self, num_cameras=20):
self.num_cameras = num_cameras
self.pipelines = []
self.frames = [None] * num_cameras
self.locks = [threading.Lock() for _ in range(num_cameras)]
def create_pipeline(self, camera_id, rtsp_url):
"""Create GStreamer pipeline with hardware acceleration"""
pipeline_str = f"""
rtspsrc location={rtsp_url} latency=0 !
rtph264depay !
h264parse !
nvv4l2decoder !
nvvidconv !
video/x-raw, format=BGRx !
appsink name=sink_{camera_id} max-buffers=1 drop=true
"""
pipeline = Gst.parse_launch(pipeline_str)
appsink = pipeline.get_by_name(f"sink_{camera_id}")
# Set callback for frames
appsink.set_property('emit-signals', True)
appsink.connect('new-sample', self.on_new_frame, camera_id)
return pipeline
def on_new_frame(self, appsink, camera_id):
"""Callback when new frame arrives"""
sample = appsink.emit('pull-sample')
if sample:
buffer = sample.get_buffer()
caps = sample.get_caps()
# Get frame dimensions
structure = caps.get_structure(0)
width = structure.get_value('width')
height = structure.get_value('height')
# Convert to numpy array
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
frame = np.ndarray(
shape=(height, width, 4),
dtype=np.uint8,
buffer=map_info.data
)
# Store frame (thread-safe)
with self.locks[camera_id]:
self.frames[camera_id] = frame[:, :, :3] # Drop alpha
buffer.unmap(map_info)
return Gst.FlowReturn.OK
def start_all_cameras(self, rtsp_urls):
"""Start all camera pipelines"""
for i, url in enumerate(rtsp_urls):
pipeline = self.create_pipeline(i, url)
self.pipelines.append(pipeline)
pipeline.set_state(Gst.State.PLAYING)
print(f"Camera {i} started")
def get_frame(self, camera_id):
"""Get latest frame from camera"""
with self.locks[camera_id]:
return self.frames[camera_id].copy() if self.frames[camera_id] is not None else None
def stop_all(self):
"""Stop all pipelines"""
for pipeline in self.pipelines:
pipeline.set_state(Gst.State.NULL)
# Usage
if __name__ == "__main__":
# RTSP camera URLs
camera_urls = [
f"rtsp://admin:password@192.168.1.{100+i}:554/stream1"
for i in range(20)
]
system = MultiCameraSystem(num_cameras=20)
system.start_all_cameras(camera_urls)
# Keep running
try:
GLib.MainLoop().run()
except KeyboardInterrupt:
system.stop_all()
3. Real-Time Object Detection on All Streams
# multi_stream_detection.py
import torch
from ultralytics import YOLO
import cv2
import time
from multicam_capture import MultiCameraSystem
import threading
from collections import deque
class MultiStreamDetector:
def __init__(self, model_path="yolov8x.pt"):
# Load YOLOv8 model
self.model = YOLO(model_path)
self.model.to('cuda')
# FPS tracking
self.fps_queues = [deque(maxlen=30) for _ in range(20)]
def detect_batch(self, frames):
"""Run detection on batch of frames"""
# Filter out None frames
valid_frames = [(i, f) for i, f in enumerate(frames) if f is not None]
if not valid_frames:
return {}
indices, frame_list = zip(*valid_frames)
# Run batch inference (much faster than individual)
results = self.model(frame_list, stream=False, verbose=False)
# Map results back to camera IDs
detections = {}
for idx, result in zip(indices, results):
detections[idx] = {
'boxes': result.boxes.xyxy.cpu().numpy(),
'scores': result.boxes.conf.cpu().numpy(),
'classes': result.boxes.cls.cpu().numpy()
}
return detections
def process_stream(self, camera_system, output_queue):
"""Main processing loop"""
batch_size = 4 # Process 4 cameras at a time for optimal GPU usage
while True:
start_time = time.time()
# Gather frames from all cameras
all_frames = []
for i in range(camera_system.num_cameras):
frame = camera_system.get_frame(i)
all_frames.append(frame)
# Process in batches
for batch_start in range(0, len(all_frames), batch_size):
batch_end = min(batch_start + batch_size, len(all_frames))
batch_frames = all_frames[batch_start:batch_end]
# Run detection
detections = self.detect_batch(batch_frames)
# Put results in queue
for cam_id, det in detections.items():
output_queue.put((cam_id, det))
# Calculate FPS
elapsed = time.time() - start_time
fps = 1.0 / elapsed if elapsed > 0 else 0
print(f"Processing FPS: {fps:.1f} | "
f"Latency: {elapsed*1000:.1f}ms | "
f"Streams: {camera_system.num_cameras}")
time.sleep(0.001) # Small sleep to prevent CPU spinning
# Usage
if __name__ == "__main__":
import queue
# Setup camera system
camera_urls = [f"rtsp://camera{i}.local/stream" for i in range(20)]
camera_system = MultiCameraSystem(num_cameras=20)
camera_system.start_all_cameras(camera_urls)
# Setup detector
detector = MultiStreamDetector(model_path="yolov8x.pt")
output_queue = queue.Queue(maxsize=100)
# Start detection thread
detection_thread = threading.Thread(
target=detector.process_stream,
args=(camera_system, output_queue),
daemon=True
)
detection_thread.start()
# Process results
while True:
try:
cam_id, detections = output_queue.get(timeout=1)
print(f"Camera {cam_id}: {len(detections['boxes'])} objects detected")
except queue.Empty:
continue
4. Using Vision Programming Interface (VPI) for Stereo Vision
# stereo_vpi.py
import vpi
import numpy as np
import cv2
class StereoVisionProcessor:
def __init__(self, max_disparity=128):
self.max_disparity = max_disparity
# Create VPI context for PVA (Programmable Vision Accelerator)
self.backend = vpi.Backend.PVA
# Create stereo disparity estimator
self.stereo = vpi.StereoDisparityEstimator(
output_format=vpi.Format.U16,
backend=self.backend,
max_disparity=max_disparity,
window_size=5,
confidence_threshold=16
)
def process_stereo_pair(self, left_img, right_img):
"""
Process stereo images using hardware acceleration
Returns: disparity map
"""
# Convert to VPI images
with vpi.Backend.CUDA:
left_vpi = vpi.asimage(left_img, vpi.Format.U8)
right_vpi = vpi.asimage(right_img, vpi.Format.U8)
# Run stereo matching on PVA
with self.backend:
disparity = self.stereo(left_vpi, right_vpi)
confidence = self.stereo.confidence
# Convert back to numpy
disparity_np = disparity.cpu()
confidence_np = confidence.cpu()
return disparity_np, confidence_np
def depth_from_disparity(self, disparity, baseline_m, focal_px):
"""
Convert disparity to depth
depth (meters) = (baseline * focal_length) / disparity
"""
# Avoid division by zero
disparity_safe = np.where(disparity > 0, disparity, 1)
depth = (baseline_m * focal_px) / disparity_safe
depth = np.where(disparity > 0, depth, 0)
return depth
# Usage
if __name__ == "__main__":
processor = StereoVisionProcessor(max_disparity=128)
# Load stereo images
left = cv2.imread('left.png', cv2.IMREAD_GRAYSCALE)
right = cv2.imread('right.png', cv2.IMREAD_GRAYSCALE)
# Process
disparity, confidence = processor.process_stereo_pair(left, right)
# Convert to depth
depth = processor.depth_from_disparity(
disparity,
baseline_m=0.12, # 12cm baseline
focal_px=700 # focal length in pixels
)
print(f"Disparity shape: {disparity.shape}")
print(f"Processing time: hardware accelerated on PVA")
Performance Benchmarks
- 20 cameras @ 1080p30: Achievable with YOLOv8m
- Stereo Processing: 8 concurrent streams @ 960×600 @ 30 FPS
- Detection Latency: <50ms per frame (batched inference)
- Total System Throughput: ~600 FPS across all streams
- Power: 110-120W under full load
Applications
- Smart Cities: Traffic monitoring, parking management
- Retail: Customer analytics, theft prevention
- Industrial: Quality control, safety monitoring
- Airports: Security, crowd management
Use Case 4: Autonomous Mobile Robot Navigation with Isaac ROS
Why Thor for AMR Navigation?
Isaac ROS provides GPU-accelerated perception and navigation packages optimized for Thor’s hardware accelerators.
Tutorial: Deploy SLAM and Navigation
1. Install Isaac ROS Packages
# Add Isaac ROS Apt repository (already done in Use Case 1)
# Install core packages
sudo apt-get install -y \
ros-jazzy-isaac-ros-visual-slam \
ros-jazzy-isaac-ros-nvblox \
ros-jazzy-isaac-ros-apriltag \
ros-jazzy-isaac-ros-depth-segmentation
# Install Navigation2
sudo apt-get install -y ros-jazzy-navigation2 ros-jazzy-nav2-bringup
2. cuVSLAM Configuration
# cuVSLAM.yaml
/**:
ros__parameters:
# Visual SLAM parameters
enable_imu_fusion: true
enable_slam_visualization: true
enable_localization_n_mapping: true
# Camera parameters
camera_optical_frames:
- camera_left_optical_frame
- camera_right_optical_frame
# IMU parameters
imu_frame: imu_link
# Performance tuning for Thor
num_threads: 8 # Use multiple CPU cores
gpu_device_id: 0
# Map parameters
enable_observations_view: true
enable_landmarks_view: true
path_max_size: 1024
3. Launch cuVSLAM
# cuVSLAM_launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
return LaunchDescription([
# cuVSLAM node
Node(
package='isaac_ros_visual_slam',
executable='isaac_ros_visual_slam',
name='visual_slam',
parameters=[{
'enable_slam_visualization': True,
'enable_landmarks_view': True,
'enable_observations_view': True,
'num_cameras': 2,
'min_num_images': 2,
'enable_imu_fusion': True,
'rectified_images': True,
}],
remappings=[
('stereo_camera/left/image', '/camera/left/image_rect'),
('stereo_camera/right/image', '/camera/right/image_rect'),
('stereo_camera/left/camera_info', '/camera/left/camera_info'),
('stereo_camera/right/camera_info', '/camera/right/camera_info'),
('visual_slam/imu', '/imu/data'),
]
),
# RViz for visualization
Node(
package='rviz2',
executable='rviz2',
name='rviz2',
arguments=['-d', '/path/to/slam.rviz']
),
])
4. Run Navigation Stack
# Launch cuVSLAM
ros2 launch isaac_ros_visual_slam cuVSLAM_launch.py
# Launch Navigation2
ros2 launch nav2_bringup navigation_launch.py \
use_sim_time:=false \
params_file:=/path/to/nav2_params.yaml
# Send navigation goal
ros2 topic pub /goal_pose geometry_msgs/PoseStamped "
{
header: {frame_id: 'map'},
pose: {
position: {x: 5.0, y: 3.0, z: 0.0},
orientation: {x: 0.0, y: 0.0, z: 0.0, w: 1.0}
}
}"
5. Python Navigation Client
# amr_navigation_client.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from nav_msgs.msg import Odometry, Path
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
class AMRNavigationClient(Node):
def __init__(self):
super().__init__('amr_navigation_client')
# Publishers
self.goal_pub = self.create_publisher(
PoseStamped,
'/goal_pose',
10
)
# Subscribers
self.odom_sub = self.create_subscription(
Odometry,
'/visual_slam/tracking/odometry',
self.odom_callback,
10
)
self.path_sub = self.create_subscription(
Path,
'/plan',
self.path_callback,
10
)
self.current_pose = None
self.cv_bridge = CvBridge()
def odom_callback(self, msg):
"""Update current robot position"""
self.current_pose = msg.pose.pose
def path_callback(self, msg):
"""Visualize planned path"""
self.get_logger().info(
f'Received path with {len(msg.poses)} waypoints'
)
def send_goal(self, x, y, theta=0.0):
"""Send navigation goal"""
goal = PoseStamped()
goal.header.frame_id = 'map'
goal.header.stamp = self.get_clock().now().to_msg()
goal.pose.position.x = x
goal.pose.position.y = y
goal.pose.position.z = 0.0
# Convert theta to quaternion
import math
goal.pose.orientation.z = math.sin(theta / 2.0)
goal.pose.orientation.w = math.cos(theta / 2.0)
self.goal_pub.publish(goal)
self.get_logger().info(f'Sent goal: ({x}, {y}, {theta})')
def navigate_waypoints(self, waypoints):
"""Navigate through list of waypoints"""
for i, (x, y, theta) in enumerate(waypoints):
self.get_logger().info(f'Waypoint {i+1}/{len(waypoints)}')
self.send_goal(x, y, theta)
# Wait for robot to reach goal (simplified)
import time
time.sleep(10) # In practice, check distance to goal
def main():
rclpy.init()
client = AMRNavigationClient()
# Define warehouse waypoints
waypoints = [
(5.0, 0.0, 0.0), # Move forward
(5.0, 5.0, 1.57), # Turn left
(0.0, 5.0, 3.14), # Move back
(0.0, 0.0, 0.0), # Return to start
]
# Execute mission
client.navigate_waypoints(waypoints)
rclpy.shutdown()
if __name__ == '__main__':
main()
Performance
- cuVSLAM: 30 FPS stereo processing with <30ms latency
- Mapping Rate: 1000+ landmarks/sec
- Navigation Update: 20 Hz path planning
- Power: 70-90W during navigation
Use Case 5: Video Search and Summarization with Multi-Modal AI
Why Thor for VSS?
Thor’s Multi-Instance GPU (MIG) technology allows running multiple AI models concurrently for efficient video analysis.
Tutorial: Build Video Search System
1. Install Dependencies
# Install video processing tools
pip3 install --break-system-packages \
opencv-python \
moviepy \
faiss-gpu \
sentence-transformers
# Install VLM dependencies
pip3 install --break-system-packages \
transformers \
torch \
pillow
2. Video Indexing System
python
# video_indexer.py
import torch
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import cv2
import numpy as np
import faiss
import pickle
from pathlib import Path
class VideoIndexer:
def __init__(self, model_name="openai/clip-vit-large-patch14"):
self.device = "cuda"
# Load CLIP model for vision-language understanding
print("Loading CLIP model...")
self.model = CLIPModel.from_pretrained(model_name).to(self.device)
self.processor = CLIPProcessor.from_pretrained(model_name)
# FAISS index for fast similarity search
self.dimension = 768 # CLIP embedding dimension
self.index = faiss.IndexFlatIP(self.dimension) # Inner product (cosine similarity)
# Metadata storage
self.frame_metadata = []
def extract_frames(self, video_path, fps=1):
"""Extract frames from video at specified FPS"""
cap = cv2.VideoCapture(video_path)
video_fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(video_fps / fps)
frames = []
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append((frame_count / video_fps, frame_rgb))
frame_count += 1
cap.release()
return frames
def encode_frames(self, frames):
"""Encode frames to CLIP embeddings"""
embeddings = []
batch_size = 32
for i in range(0, len(frames), batch_size):
batch_frames = [f[1] for f in frames[i:i+batch_size]]
# Process images
images = [Image.fromarray(f) for f in batch_frames]
inputs = self.processor(images=images, return_tensors="pt", padding=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Get embeddings
with torch.no_grad():
image_features = self.model.get_image_features(**inputs)
image_features = image_features / image_features.norm(p=2, dim=-1, keepdim=True)
embeddings.append(image_features.cpu().numpy())
return np.vstack(embeddings)
def index_video(self, video_path):
"""Index entire video"""
print(f"Processing {video_path}...")
# Extract frames
frames = self.extract_frames(video_path, fps=1)
print(f"Extracted {len(frames)} frames")
# Encode frames
embeddings = self.encode_frames(frames)
# Add to FAISS index
self.index.add(embeddings.astype('float32'))
# Store metadata
for timestamp, frame in frames:
self.frame_metadata.append({
'video': str(video_path),
'timestamp': timestamp,
'frame_id': len(self.frame_metadata)
})
print(f"Indexed {len(frames)} frames from {video_path}")
def search(self, text_query, k=10):
"""Search videos using text query"""
# Encode text query
inputs = self.processor(text=[text_query], return_tensors="pt", padding=True)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
text_features = self.model.get_text_features(**inputs)
text_features = text_features / text_features.norm(p=2, dim=-1, keepdim=True)
query_embedding = text_features.cpu().numpy().astype('float32')
# Search FAISS index
scores, indices = self.index.search(query_embedding, k)
# Get results
results = []
for score, idx in zip(scores[0], indices[0]):
if idx < len(self.frame_metadata):
result = self.frame_metadata[idx].copy()
result['score'] = float(score)
results.append(result)
return results
def save_index(self, path):
"""Save index and metadata"""
faiss.write_index(self.index, f"{path}.index")
with open(f"{path}.metadata", 'wb') as f:
pickle.dump(self.frame_metadata, f)
def load_index(self, path):
"""Load index and metadata"""
self.index = faiss.read_index(f"{path}.index")
with open(f"{path}.metadata", 'rb') as f:
self.frame_metadata = pickle.load(f)
# Usage
if __name__ == "__main__":
indexer = VideoIndexer()
# Index videos
video_dir = Path("/mnt/nova_ssd/videos/")
for video_file in video_dir.glob("*.mp4"):
indexer.index_video(str(video_file))
# Save index
indexer.save_index("/mnt/nova_ssd/video_index")
# Search
results = indexer.search("person walking with a dog", k=5)
for r in results:
print(f"Video: {r['video']} | Time: {r['timestamp']:.1f}s | Score: {r['score']:.3f}")
3. Video Summarization
# video_summarizer.py
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import cv2
from PIL import Image
import base64
from io import BytesIO
class VideoSummarizer:
def __init__(self, model_name="microsoft/phi-3.5-vision-instruct"):
self.device = "cuda"
# Load vision-language model
print("Loading VLM for summarization...")
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16,
trust_remote_code=True
).to(self.device)
self.tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True
)
def extract_key_frames(self, video_path, num_frames=10):
"""Extract representative frames"""
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Sample frames evenly
frame_indices = np.linspace(0, total_frames-1, num_frames, dtype=int)
frames = []
for idx in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(Image.fromarray(frame_rgb))
cap.release()
return frames
def summarize_video(self, video_path):
"""Generate text summary of video"""
# Extract key frames
frames = self.extract_key_frames(video_path, num_frames=8)
# Prepare prompt
prompt = """Analyze these frames from a video and provide a concise summary including:
1. Main activities or events
2. People or objects present
3. Setting/location
4. Notable actions or changes
Summary:"""
# Encode frames + prompt
inputs = self.tokenizer(
prompt,
return_tensors="pt",
padding=True
).to(self.device)
# Generate summary
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=256,
temperature=0.7,
do_sample=True
)
summary = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return summary.split("Summary:")[-1].strip()
# Usage
if __name__ == "__main__":
summarizer = VideoSummarizer()
video_path = "/mnt/nova_ssd/videos/security_cam_001.mp4"
summary = summarizer.summarize_video(video_path)
print("Video Summary:")
print(summary)
Performance
- Indexing: ~1 FPS per video stream
- Search Latency: <100ms for 10k+ frames
- Summarization: ~5-10 seconds per video
- Concurrent Streams: 4+ using MIG
Conclusion
NVIDIA Jetson Thor represents a quantum leap in edge AI capabilities. With 2,070 FP4 TFLOPS, 128GB of memory, and the Blackwell architecture, Thor enables applications that were previously impossible at the edge:
- Humanoid Robotics: Real-time control with Isaac GR00T
- Edge LLMs: Running 70B-120B models locally
- Multi-Camera Analytics: 20+ simultaneous streams
- Autonomous Navigation: GPU-accelerated SLAM
- Video Intelligence: Content search and summarization
Thor is not just faster—it unlocks entirely new categories of edge AI applications, from warehouse humanoids to private AI assistants to intelligent surveillance systems.
Getting Started Resources
- Official Docs: https://developer.nvidia.com/jetson-thor
- Isaac ROS: https://nvidia-isaac-ros.github.io
- GR00T GitHub: https://github.com/NVIDIA/Isaac-GR00T
- Developer Forums: https://forums.developer.nvidia.com/c/agx-autonomous-machines/jetson-embedded-systems/