Join our Discord Server
Collabnix Team The Collabnix Team is a diverse collective of Docker, Kubernetes, and IoT experts united by a passion for cloud-native technologies. With backgrounds spanning across DevOps, platform engineering, cloud architecture, and container orchestration, our contributors bring together decades of combined experience from various industries and technical domains.

Building Secure Remote MCP Servers: A Complete Guide

15 min read

How to Build Secure Remote MCP Servers Effectively

As AI agents become integral to enterprise workflows, the Model Context Protocol (MCP) layer has emerged as a critical infrastructure component. MCP servers act as secure bridges between Large Language Models and sensitive enterprise systems—from customer databases to financial platforms. This comprehensive guide covers everything you need to build, secure, and scale MCP servers for production environments.

Understanding MCP Security Threats

The Attack Surface

MCP servers present unique security challenges because they:

  • Handle sensitive enterprise data across multiple systems
  • Execute AI-driven actions with real business impact
  • Operate in high-trust environments with elevated privileges
  • Process dynamic requests from autonomous agents

Common Vulnerability Patterns

Confused Deputy Attacks: Malicious actors trick AI agents into performing unauthorized actions by manipulating context or instructions.

Token Relay Exploits: Stolen or reused authentication tokens gain access to resources across multiple MCP endpoints.

Session Hijacking: Compromised session data allows attackers to impersonate legitimate AI agents.

Data Exfiltration: Poorly scoped queries expose sensitive information across tenant boundaries.

Implementing OAuth 2.1 Authentication

Why OAuth 2.1 for MCP?

OAuth 2.1 provides standardized security patterns specifically designed for modern API architectures:

# MCP Server OAuth 2.1 Implementation
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer
import httpx
import jwt
from typing import Optional

app = FastAPI()
security = HTTPBearer()

class MCPOAuthConfig:
    def __init__(self):
        self.jwks_url = "https://your-auth-server.com/.well-known/jwks.json"
        self.audience = "https://your-mcp-server.com"
        self.issuer = "https://your-auth-server.com"
        self.metadata_endpoint = "/.well-known/oauth-protected-resource"

oauth_config = MCPOAuthConfig()

@app.get("/.well-known/oauth-protected-resource")
async def oauth_metadata():
    """RFC 9728 - OAuth 2.0 Protected Resource Metadata"""
    return {
        "resource": oauth_config.audience,
        "authorization_servers": [oauth_config.issuer],
        "jwks_uri": oauth_config.jwks_url,
        "scopes_supported": ["mcp:read", "mcp:write", "mcp:admin"],
        "response_types_supported": ["code"],
        "grant_types_supported": ["authorization_code", "client_credentials"]
    }

Dynamic Client Registration

Enable automatic client registration for scaling AI agents:

@app.post("/oauth/register")
async def dynamic_client_registration(registration_request: dict):
    """RFC 7591 - Dynamic Client Registration"""
    
    # Validate registration request
    required_fields = ["client_name", "redirect_uris", "scope"]
    if not all(field in registration_request for field in required_fields):
        raise HTTPException(status_code=400, detail="Missing required fields")
    
    # Generate client credentials
    client_id = generate_client_id()
    client_secret = generate_client_secret()
    
    # Store client metadata
    await store_client_metadata({
        "client_id": client_id,
        "client_secret": client_secret,
        "client_name": registration_request["client_name"],
        "redirect_uris": registration_request["redirect_uris"],
        "scope": registration_request["scope"],
        "created_at": datetime.utcnow()
    })
    
    return {
        "client_id": client_id,
        "client_secret": client_secret,
        "registration_access_token": generate_registration_token(),
        "expires_in": 3600
    }

Advanced Token Validation Strategies

Robust JWT Validation

Implement comprehensive token validation with proper error handling:

import jwt
from jwt import PyJWKClient
from fastapi import HTTPException, status

class TokenValidator:
    def __init__(self, jwks_url: str, audience: str, issuer: str):
        self.jwks_client = PyJWKClient(jwks_url)
        self.audience = audience
        self.issuer = issuer
    
    async def validate_token(self, token: str) -> dict:
        try:
            # Get signing key
            signing_key = self.jwks_client.get_signing_key_from_jwt(token)
            
            # Decode and validate
            payload = jwt.decode(
                token,
                signing_key.key,
                algorithms=["RS256"],
                audience=self.audience,
                issuer=self.issuer,
                options={
                    "verify_signature": True,
                    "verify_exp": True,
                    "verify_aud": True,
                    "verify_iss": True
                }
            )
            
            # Additional custom validations
            if not self._validate_scopes(payload.get("scope", "")):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail="Insufficient scope"
                )
            
            return payload
            
        except jwt.ExpiredSignatureError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token expired",
                headers={"WWW-Authenticate": f"Bearer realm=\"{self.audience}\""}
            )
        except jwt.InvalidAudienceError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid audience"
            )
        except Exception as e:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail=f"Token validation failed: {str(e)}"
            )
    
    def _validate_scopes(self, token_scopes: str) -> bool:
        required_scopes = {"mcp:read"}  # Minimum required scope
        token_scope_set = set(token_scopes.split())
        return required_scopes.issubset(token_scope_set)

# Dependency for protected endpoints
async def get_current_user(token: str = Depends(security)):
    validator = TokenValidator(
        oauth_config.jwks_url,
        oauth_config.audience,
        oauth_config.issuer
    )
    return await validator.validate_token(token.credentials)

Resource-Bound Tokens (RFC 8707)

Implement resource indicators to prevent token reuse:

@app.middleware("http")
async def validate_resource_binding(request: Request, call_next):
    if request.url.path.startswith("/api/"):
        auth_header = request.headers.get("authorization")
        if auth_header:
            token = auth_header.replace("Bearer ", "")
            payload = jwt.decode(token, options={"verify_signature": False})
            
            # Verify resource binding
            if payload.get("resource") != oauth_config.audience:
                return JSONResponse(
                    status_code=403,
                    content={"error": "Token not bound to this resource"}
                )
    
    response = await call_next(request)
    return response

Multi-Tenant Authorization Architecture

User Context Extraction

class UserContext:
    def __init__(self, user_id: str, tenant_id: str, roles: list, scopes: list):
        self.user_id = user_id
        self.tenant_id = tenant_id
        self.roles = roles
        self.scopes = scopes
    
    def has_permission(self, resource: str, action: str) -> bool:
        """Check if user has permission for specific resource action"""
        required_scope = f"{resource}:{action}"
        return required_scope in self.scopes or "admin" in self.roles

def extract_user_context(token_payload: dict) -> UserContext:
    return UserContext(
        user_id=token_payload["sub"],
        tenant_id=token_payload.get("tenant_id"),
        roles=token_payload.get("roles", []),
        scopes=token_payload.get("scope", "").split()
    )

# Authorization decorator
def require_permission(resource: str, action: str):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            user_context = kwargs.get("current_user")
            if not user_context.has_permission(resource, action):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Permission denied for {resource}:{action}"
                )
            return await func(*args, **kwargs)
        return wrapper
    return decorator

Data Isolation Patterns

class TenantAwareRepository:
    def __init__(self, db_session):
        self.db = db_session
    
    async def get_documents(self, user_context: UserContext, filters: dict):
        """Always scope queries by tenant"""
        query = self.db.query(Document).filter(
            Document.tenant_id == user_context.tenant_id
        )
        
        # Apply additional user-level filters
        if "manager" not in user_context.roles:
            query = query.filter(Document.owner_id == user_context.user_id)
        
        # Apply request filters
        for key, value in filters.items():
            if hasattr(Document, key):
                query = query.filter(getattr(Document, key) == value)
        
        return query.all()

@app.get("/api/documents")
@require_permission("documents", "read")
async def get_documents(
    filters: dict = {},
    current_user: UserContext = Depends(get_current_user)
):
    repo = TenantAwareRepository(db_session)
    documents = await repo.get_documents(current_user, filters)
    return {"documents": documents}

Container Orchestration and Auto-Scaling

Kubernetes Deployment Configuration

# mcp-server-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-server
  labels:
    app: mcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-server
  template:
    metadata:
      labels:
        app: mcp-server
    spec:
      containers:
      - name: mcp-server
        image: your-registry/mcp-server:latest
        ports:
        - containerPort: 8000
        env:
        - name: OAUTH_JWKS_URL
          valueFrom:
            configMapKeyRef:
              name: mcp-config
              key: jwks-url
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: database-url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: mcp-server-service
spec:
  selector:
    app: mcp-server
  ports:
  - port: 80
    targetPort: 8000
  type: LoadBalancer

Horizontal Pod Autoscaler

# mcp-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-server-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-server
  minReplicas: 3
  maxReplicas: 50
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 15
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60

Health Check Implementation

@app.get("/health")
async def health_check():
    """Kubernetes liveness probe endpoint"""
    try:
        # Check critical dependencies
        await check_database_connection()
        await check_oauth_provider()
        await check_external_services()
        
        return {
            "status": "healthy",
            "timestamp": datetime.utcnow().isoformat(),
            "version": os.getenv("APP_VERSION", "unknown")
        }
    except Exception as e:
        raise HTTPException(status_code=503, detail=f"Health check failed: {e}")

@app.get("/ready")
async def readiness_check():
    """Kubernetes readiness probe endpoint"""
    try:
        # Check if server is ready to accept traffic
        await verify_oauth_configuration()
        await check_cache_connectivity()
        
        return {"status": "ready"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=f"Not ready: {e}")

AI Gateway Implementation

Kong Gateway Configuration

# kong-gateway.yaml
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
  name: mcp-rate-limiting
plugin: rate-limiting
config:
  minute: 1000
  hour: 10000
  policy: redis
  redis_host: redis-cluster
  fault_tolerant: true
---
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
  name: mcp-jwt-auth
plugin: jwt
config:
  key_claim_name: kid
  secret_is_base64: false
  claims_to_verify:
    - exp
    - nbf
    - aud
---
apiVersion: configuration.konghq.com/v1
kind: KongIngress
metadata:
  name: mcp-ingress
spec:
  upstream:
    healthchecks:
      active:
        healthy:
          successes: 3
        unhealthy:
          http_failures: 3
        http_path: /health
        interval: 10
  proxy:
    connect_timeout: 10000
    read_timeout: 60000
    write_timeout: 60000
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: mcp-server-ingress
  annotations:
    konghq.com/plugins: mcp-rate-limiting,mcp-jwt-auth
    configuration.konghq.com/ingress.class: kong
spec:
  rules:
  - host: mcp-api.yourcompany.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: mcp-server-service
            port:
              number: 80

Custom Gateway Middleware

# gateway_middleware.py
from fastapi import Request, HTTPException
import time
import redis
from typing import Dict, Any

class AIGatewayMiddleware:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.rate_limits = {
            "default": {"requests": 100, "window": 60},
            "premium": {"requests": 1000, "window": 60}
        }
    
    async def __call__(self, request: Request, call_next):
        start_time = time.time()
        
        # Rate limiting
        await self._check_rate_limit(request)
        
        # Request validation
        await self._validate_request(request)
        
        # Process request
        response = await call_next(request)
        
        # Add security headers
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        
        # Log metrics
        process_time = time.time() - start_time
        await self._log_metrics(request, response, process_time)
        
        return response
    
    async def _check_rate_limit(self, request: Request):
        client_id = self._extract_client_id(request)
        tier = await self._get_client_tier(client_id)
        
        limit_config = self.rate_limits.get(tier, self.rate_limits["default"])
        
        key = f"rate_limit:{client_id}"
        current = await self.redis.incr(key)
        
        if current == 1:
            await self.redis.expire(key, limit_config["window"])
        
        if current > limit_config["requests"]:
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded",
                headers={
                    "Retry-After": str(limit_config["window"]),
                    "X-RateLimit-Limit": str(limit_config["requests"]),
                    "X-RateLimit-Remaining": "0"
                }
            )

Enterprise Secrets Management

Azure Key Vault Integration

from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
import asyncio
from typing import Dict, Optional

class SecretsManager:
    def __init__(self, vault_url: str):
        self.vault_url = vault_url
        self.credential = DefaultAzureCredential()
        self.client = SecretClient(vault_url=vault_url, credential=self.credential)
        self._cache: Dict[str, Any] = {}
        self._cache_ttl = 300  # 5 minutes
    
    async def get_secret(self, secret_name: str) -> Optional[str]:
        """Get secret with caching and automatic refresh"""
        cache_key = f"secret:{secret_name}"
        
        # Check cache first
        if cache_key in self._cache:
            cached_data = self._cache[cache_key]
            if time.time() - cached_data["timestamp"] < self._cache_ttl:
                return cached_data["value"]
        
        try:
            # Fetch from Key Vault
            secret = await asyncio.to_thread(
                self.client.get_secret, secret_name
            )
            
            # Cache the result
            self._cache[cache_key] = {
                "value": secret.value,
                "timestamp": time.time()
            }
            
            return secret.value
            
        except Exception as e:
            logger.error(f"Failed to retrieve secret {secret_name}: {e}")
            # Return cached value if available, even if expired
            if cache_key in self._cache:
                return self._cache[cache_key]["value"]
            raise
    
    async def get_database_config(self) -> dict:
        """Get database configuration from secrets"""
        return {
            "host": await self.get_secret("db-host"),
            "port": int(await self.get_secret("db-port")),
            "database": await self.get_secret("db-name"),
            "username": await self.get_secret("db-username"),
            "password": await self.get_secret("db-password")
        }

# Initialize secrets manager
secrets_manager = SecretsManager(
    vault_url=os.getenv("AZURE_KEY_VAULT_URL")
)

@app.on_event("startup")
async def load_secrets():
    """Load critical secrets at startup"""
    try:
        global oauth_config
        oauth_config.client_secret = await secrets_manager.get_secret("oauth-client-secret")
        oauth_config.jwt_signing_key = await secrets_manager.get_secret("jwt-signing-key")
        
        # Validate all required secrets are present
        required_secrets = ["oauth-client-secret", "jwt-signing-key", "db-password"]
        for secret_name in required_secrets:
            value = await secrets_manager.get_secret(secret_name)
            if not value:
                raise ValueError(f"Required secret {secret_name} not found")
                
        logger.info("All required secrets loaded successfully")
        
    except Exception as e:
        logger.error(f"Failed to load secrets: {e}")
        raise

Kubernetes Secrets with CSI Driver

# secretstore.yaml
apiVersion: secrets-store.csi.x-k8s.io/v1
kind: SecretProviderClass
metadata:
  name: mcp-secrets
spec:
  provider: azure
  parameters:
    usePodIdentity: "false"
    useVMManagedIdentity: "true"
    userAssignedIdentityClientID: "your-identity-client-id"
    keyvaultName: "your-keyvault-name"
    tenantId: "your-tenant-id"
    objects: |
      array:
        - |
          objectName: oauth-client-secret
          objectType: secret
        - |
          objectName: jwt-signing-key
          objectType: secret
        - |
          objectName: db-password
          objectType: secret
  secretObjects:
  - secretName: mcp-app-secrets
    type: Opaque
    data:
    - objectName: oauth-client-secret
      key: oauth-client-secret
    - objectName: jwt-signing-key
      key: jwt-signing-key
    - objectName: db-password
      key: db-password

Production Observability Stack

Structured Logging with Correlation IDs

import structlog
import uuid
from contextvars import ContextVar
from fastapi import Request

# Context variable for correlation ID
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')

class CorrelationIDMiddleware:
    async def __call__(self, request: Request, call_next):
        # Generate or extract correlation ID
        correlation_id = request.headers.get('X-Correlation-ID') or str(uuid.uuid4())
        correlation_id_var.set(correlation_id)
        
        # Add to response headers
        response = await call_next(request)
        response.headers['X-Correlation-ID'] = correlation_id
        return response

# Configure structured logging
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.dev.ConsoleRenderer()  # Use JSONRenderer for production
    ],
    wrapper_class=structlog.make_filtering_bound_logger(20),  # INFO level
    logger_factory=structlog.PrintLoggerFactory(),
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

@app.middleware("http")
async def logging_middleware(request: Request, call_next):
    start_time = time.time()
    
    # Log request
    logger.info(
        "Request started",
        method=request.method,
        url=str(request.url),
        client_ip=request.client.host,
        user_agent=request.headers.get("user-agent"),
        correlation_id=correlation_id_var.get()
    )
    
    try:
        response = await call_next(request)
        
        # Log successful response
        process_time = time.time() - start_time
        logger.info(
            "Request completed",
            status_code=response.status_code,
            process_time=process_time,
            correlation_id=correlation_id_var.get()
        )
        
        return response
        
    except Exception as e:
        # Log error
        process_time = time.time() - start_time
        logger.error(
            "Request failed",
            error=str(e),
            error_type=type(e).__name__,
            process_time=process_time,
            correlation_id=correlation_id_var.get()
        )
        raise

OpenTelemetry Distributed Tracing

from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

# Configure tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
    agent_host_name=os.getenv("JAEGER_AGENT_HOST", "localhost"),
    agent_port=int(os.getenv("JAEGER_AGENT_PORT", "6831")),
)

span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Auto-instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
RequestsInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument(engine=db_engine)

# Custom tracing for business logic
@app.get("/api/process-request")
async def process_request(request_data: dict, current_user: UserContext = Depends(get_current_user)):
    with tracer.start_as_current_span("process_mcp_request") as span:
        span.set_attribute("user.id", current_user.user_id)
        span.set_attribute("tenant.id", current_user.tenant_id)
        span.set_attribute("request.type", request_data.get("type", "unknown"))
        
        try:
            # Process request with nested spans
            with tracer.start_as_current_span("validate_input"):
                await validate_request_data(request_data)
            
            with tracer.start_as_current_span("execute_business_logic"):
                result = await execute_mcp_operation(request_data, current_user)
            
            span.set_attribute("result.status", "success")
            return result
            
        except Exception as e:
            span.set_attribute("result.status", "error")
            span.set_attribute("error.message", str(e))
            raise

Prometheus Metrics

from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time

# Define metrics
REQUEST_COUNT = Counter(
    'mcp_requests_total',
    'Total number of MCP requests',
    ['method', 'endpoint', 'status_code', 'tenant_id']
)

REQUEST_DURATION = Histogram(
    'mcp_request_duration_seconds',
    'Request duration in seconds',
    ['method', 'endpoint']
)

ACTIVE_CONNECTIONS = Gauge(
    'mcp_active_connections',
    'Number of active connections'
)

TOKEN_VALIDATION_ERRORS = Counter(
    'mcp_token_validation_errors_total',
    'Number of token validation errors',
    ['error_type']
)

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    
    ACTIVE_CONNECTIONS.inc()
    
    try:
        response = await call_next(request)
        
        # Record metrics
        REQUEST_COUNT.labels(
            method=request.method,
            endpoint=request.url.path,
            status_code=response.status_code,
            tenant_id=getattr(request.state, 'tenant_id', 'unknown')
        ).inc()
        
        REQUEST_DURATION.labels(
            method=request.method,
            endpoint=request.url.path
        ).observe(time.time() - start_time)
        
        return response
        
    finally:
        ACTIVE_CONNECTIONS.dec()

@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint"""
    return Response(generate_latest(), media_type="text/plain")

Performance Benchmarking

Load Testing with Locust

# locustfile.py
from locust import HttpUser, task, between
import random
import jwt
import time

class MCPUser(HttpUser):
    wait_time = between(1, 5)
    
    def on_start(self):
        """Generate test token"""
        self.token = self.generate_test_token()
        self.headers = {"Authorization": f"Bearer {self.token}"}
    
    def generate_test_token(self):
        payload = {
            "sub": f"test-user-{random.randint(1, 1000)}",
            "tenant_id": f"tenant-{random.randint(1, 10)}",
            "scope": "mcp:read mcp:write",
            "exp": int(time.time()) + 3600,
            "aud": "https://your-mcp-server.com"
        }
        return jwt.encode(payload, "secret", algorithm="HS256")
    
    @task(3)
    def read_documents(self):
        """Simulate document read operations"""
        response = self.client.get(
            "/api/documents",
            headers=self.headers,
            params={"limit": 10}
        )
        
        if response.status_code == 200:
            documents = response.json().get("documents", [])
            if documents:
                # Follow up with detail request
                doc_id = random.choice(documents)["id"]
                self.client.get(f"/api/documents/{doc_id}", headers=self.headers)
    
    @task(1)
    def create_document(self):
        """Simulate document creation"""
        document_data = {
            "title": f"Test Document {random.randint(1, 10000)}",
            "content": "This is a test document for load testing",
            "tags": ["test", "load-test"]
        }
        
        self.client.post(
            "/api/documents",
            json=document_data,
            headers=self.headers
        )
    
    @task(2)
    def search_documents(self):
        """Simulate search operations"""
        search_terms = ["test", "document", "data", "report", "analysis"]
        query = random.choice(search_terms)
        
        self.client.get(
            "/api/search",
            headers=self.headers,
            params={"q": query, "limit": 20}
        )

# Run with: locust -f locustfile.py --host=https://your-mcp-server.com

Performance Monitoring Dashboard

# performance_monitor.py
import asyncio
import psutil
import time
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class PerformanceMetrics:
    timestamp: float
    cpu_percent: float
    memory_percent: float
    disk_io_read: int
    disk_io_write: int
    network_io_sent: int
    network_io_recv: int
    active_connections: int
    response_time_p95: float
    error_rate: float

class PerformanceMonitor:
    def __init__(self):
        self.metrics_history: List[PerformanceMetrics] = []
        self.response_times: List[float] = []
        self.error_count = 0
        self.request_count = 0
    
    async def collect_metrics(self):
        """Collect system and application metrics"""
        while True:
            try:
                # System metrics
                cpu_percent = psutil.cpu_percent(interval=1)
                memory = psutil.virtual_memory()
                disk_io = psutil.disk_io_counters()
                network_io = psutil.net_io_counters()
                
                # Application metrics
                p95_response_time = self._calculate_p95_response_time()
                error_rate = (self.error_count / max(self.request_count, 1)) * 100
                
                metrics = PerformanceMetrics(
                    timestamp=time.time(),
                    cpu_percent=cpu_percent,
                    memory_percent=memory.percent,
                    disk_io_read=disk_io.read_bytes if disk_io else 0,
                    disk_io_write=disk_io.write_bytes if disk_io else 0,
                    network_io_sent=network_io.bytes_sent if network_io else 0,
                    network_io_recv=network_io.bytes_recv if network_io else 0,
                    active_connections=len(active_connections),
                    response_time_p95=p95_response_time,
                    error_rate=error_rate
                )
                
                self.metrics_history.append(metrics)
                
                # Keep only last 1000 metrics
                if len(self.metrics_history) > 1000:
                    self.metrics_history = self.metrics_history[-1000:]
                
                # Reset counters periodically
                if len(self.response_times) > 10000:
                    self.response_times = self.response_times[-1000:]
                    self.error_count = int(self.error_count * 0.1)
                    self.request_count = int(self.request_count * 0.1)
                
                await asyncio.sleep(10)  # Collect every 10 seconds
                
            except Exception as e:
                logger.error(f"Error collecting metrics: {e}")
                await asyncio.sleep(10)
    
    def _calculate_p95_response_time(self) -> float:
        if not self.response_times:
            return 0.0
        
        sorted_times = sorted(self.response_times)
        index = int(0.95 * len(sorted_times))
        return sorted_times[index] if index < len(sorted_times) else 0.0
    
    def record_request(self, response_time: float, is_error: bool = False):
        """Record request metrics"""
        self.response_times.append(response_time)
        self.request_count += 1
        if is_error:
            self.error_count += 1

# Initialize performance monitor
perf_monitor = PerformanceMonitor()

@app.on_event("startup")
async def start_performance_monitoring():
    asyncio.create_task(perf_monitor.collect_metrics())

@app.get("/api/performance/metrics")
async def get_performance_metrics():
    """Get current performance metrics"""
    if not perf_monitor.metrics_history:
        return {"message": "No metrics available yet"}
    
    latest = perf_monitor.metrics_history[-1]
    return {
        "current": {
            "cpu_percent": latest.cpu_percent,
            "memory_percent": latest.memory_percent,
            "response_time_p95": latest.response_time_p95,
            "error_rate": latest.error_rate,
            "active_connections": latest.active_connections
        },
        "history": perf_monitor.metrics_history[-100:]  # Last 100 data points
    }

Security Checklist

Pre-Production Security Audit

# security_audit.py
import asyncio
import ssl
import socket
import requests
from typing import List, Dict, Any
import subprocess

class SecurityAudit:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.results: List[Dict[str, Any]] = []
    
    async def run_full_audit(self) -> Dict[str, Any]:
        """Run comprehensive security audit"""
        audit_results = {}
        
        # SSL/TLS Configuration
        audit_results["ssl_audit"] = await self._audit_ssl_configuration()
        
        # HTTP Security Headers
        audit_results["headers_audit"] = await self._audit_security_headers()
        
        # Authentication Flows
        audit_results["auth_audit"] = await self._audit_authentication()
        
        # API Security
        audit_results["api_audit"] = await self._audit_api_security()
        
        # Dependencies
        audit_results["dependencies_audit"] = await self._audit_dependencies()
        
        return audit_results
    
    async def _audit_ssl_configuration(self) -> Dict[str, Any]:
        """Audit SSL/TLS configuration"""
        try:
            hostname = self.base_url.replace("https://", "").replace("http://", "")
            context = ssl.create_default_context()
            
            with socket.create_connection((hostname, 443)) as sock:
                with context.wrap_socket(sock, server_hostname=hostname) as ssock:
                    cert = ssock.getpeercert()
                    cipher = ssock.cipher()
                    
                    return {
                        "status": "pass",
                        "certificate_info": {
                            "subject": dict(x[0] for x in cert["subject"]),
                            "issuer": dict(x[0] for x in cert["issuer"]),
                            "version": cert["version"],
                            "expires": cert["notAfter"]
                        },
                        "cipher_suite": cipher,
                        "tls_version": ssock.version()
                    }
        except Exception as e:
            return {"status": "fail", "error": str(e)}
    
    async def _audit_security_headers(self) -> Dict[str, Any]:
        """Audit HTTP security headers"""
        try:
            response = requests.get(self.base_url, timeout=10)
            headers = response.headers
            
            required_headers = {
                "X-Content-Type-Options": "nosniff",
                "X-Frame-Options": ["DENY", "SAMEORIGIN"],
                "X-XSS-Protection": "1; mode=block",
                "Strict-Transport-Security": None,  # Should be present
                "Content-Security-Policy": None
            }
            
            audit_results = {"status": "pass", "headers": {}}
            
            for header, expected_value in required_headers.items():
                if header in headers:
                    actual_value = headers[header]
                    if expected_value is None:
                        audit_results["headers"][header] = {"status": "present", "value": actual_value}
                    elif isinstance(expected_value, list):
                        status = "pass" if actual_value in expected_value else "fail"
                        audit_results["headers"][header] = {"status": status, "value": actual_value}
                    else:
                        status = "pass" if actual_value == expected_value else "fail"
                        audit_results["headers"][header] = {"status": status, "value": actual_value}
                else:
                    audit_results["headers"][header] = {"status": "missing"}
                    audit_results["status"] = "fail"
            
            return audit_results
            
        except Exception as e:
            return {"status": "error", "error": str(e)}
    
    async def _audit_authentication(self) -> Dict[str, Any]:
        """Audit authentication implementation"""
        audit_results = {"tests": []}
        
        # Test 1: Unauthenticated request should return 401
        try:
            response = requests.get(f"{self.base_url}/api/documents", timeout=10)
            if response.status_code == 401:
                audit_results["tests"].append({
                    "name": "Unauthenticated access blocked",
                    "status": "pass"
                })
            else:
                audit_results["tests"].append({
                    "name": "Unauthenticated access blocked",
                    "status": "fail",
                    "details": f"Expected 401, got {response.status_code}"
                })
        except Exception as e:
            audit_results["tests"].append({
                "name": "Unauthenticated access blocked",
                "status": "error",
                "error": str(e)
            })
        
        # Test 2: Invalid token should return 401
        try:
            headers = {"Authorization": "Bearer invalid-token"}
            response = requests.get(f"{self.base_url}/api/documents", headers=headers, timeout=10)
            if response.status_code == 401:
                audit_results["tests"].append({
                    "name": "Invalid token rejected",
                    "status": "pass"
                })
            else:
                audit_results["tests"].append({
                    "name": "Invalid token rejected",
                    "status": "fail",
                    "details": f"Expected 401, got {response.status_code}"
                })
        except Exception as e:
            audit_results["tests"].append({
                "name": "Invalid token rejected",
                "status": "error",
                "error": str(e)
            })
        
        return audit_results
    
    async def _audit_dependencies(self) -> Dict[str, Any]:
        """Audit dependencies for known vulnerabilities"""
        try:
            # Run safety check (requires safety package)
            result = subprocess.run(
                ["safety", "check", "--json"],
                capture_output=True,
                text=True,
                timeout=30
            )
            
            if result.returncode == 0:
                return {"status": "pass", "vulnerabilities": []}
            else:
                vulnerabilities = result.stdout
                return {"status": "fail", "vulnerabilities": vulnerabilities}
                
        except subprocess.TimeoutExpired:
            return {"status": "error", "error": "Safety check timed out"}
        except FileNotFoundError:
            return {"status": "skip", "error": "Safety tool not installed"}
        except Exception as e:
            return {"status": "error", "error": str(e)}

# Usage
async def run_security_audit():
    audit = SecurityAudit("https://your-mcp-server.com")
    results = await audit.run_full_audit()
    
    print("Security Audit Results:")
    print("=" * 50)
    
    for category, result in results.items():
        print(f"\n{category.upper()}:")
        if result.get("status") == "pass":
            print("✅ PASS")
        elif result.get("status") == "fail":
            print("❌ FAIL")
        else:
            print("⚠️  WARNING")
        
        if "error" in result:
            print(f"  Error: {result['error']}")

Deployment Security Checklist

Create a comprehensive checklist for production deployments:

## MCP Server Security Checklist

### Authentication & Authorization
- [ ] OAuth 2.1 flow properly implemented
- [ ] JWT validation includes all required claims
- [ ] Token expiration properly enforced
- [ ] Resource binding (RFC 8707) implemented
- [ ] Multi-tenant isolation verified
- [ ] Role-based access control configured
- [ ] Dynamic client registration secured

### Network Security
- [ ] HTTPS enforced with valid TLS certificate
- [ ] Security headers properly configured
- [ ] CORS policy restrictive and appropriate
- [ ] Rate limiting configured
- [ ] API gateway deployed and configured
- [ ] Private networking for internal services

### Data Protection
- [ ] All database queries scoped by tenant
- [ ] Sensitive data encrypted at rest
- [ ] Audit logging for all data access
- [ ] Data retention policies implemented
- [ ] Backup encryption configured

### Infrastructure Security
- [ ] Container images scanned for vulnerabilities
- [ ] Kubernetes RBAC configured
- [ ] Network policies implemented
- [ ] Pod security standards enforced
- [ ] Secrets managed via secure store
- [ ] Regular security updates scheduled

### Monitoring & Alerting
- [ ] Structured logging implemented
- [ ] Distributed tracing configured
- [ ] Security event monitoring active
- [ ] Performance metrics collected
- [ ] Alert thresholds configured
- [ ] Incident response procedures documented

### Compliance & Governance
- [ ] Data classification performed
- [ ] Privacy impact assessment completed
- [ ] Security controls documented
- [ ] Regular security audits scheduled
- [ ] Incident response plan tested

Conclusion

Building secure and scalable MCP servers requires careful attention to authentication flows, multi-tenant isolation, container orchestration, and comprehensive observability. The patterns and implementations shown in this guide provide a solid foundation for production-ready MCP infrastructure.

Key takeaways for successful MCP server deployment:

Security First: Implement OAuth 2.1 flows, robust token validation, and multi-tenant isolation from day one. Security vulnerabilities in MCP servers can have cascading effects across AI-driven workflows.

Observability is Critical: Comprehensive logging, tracing, and metrics enable rapid issue detection and resolution. The correlation ID pattern ensures end-to-end request tracking across distributed systems.

Plan for Scale: Container orchestration with Kubernetes, AI gateways for cross-cutting concerns, and horizontal pod autoscaling ensure your MCP infrastructure can handle growing AI workloads.

Automate Security: Dynamic secrets management, automated security audits, and infrastructure-as-code practices reduce the risk of configuration drift and human error.

By following these architectural patterns and implementation strategies, your MCP servers will be ready to securely serve AI agents at enterprise scale. Remember to regularly review and update your security posture as the MCP ecosystem continues to evolve.


Have questions about implementing these patterns? Share your experiences and challenges in the comments below. For more AI infrastructure guides, subscribe to our newsletter.

Have Queries? Join https://launchpass.com/collabnix

Collabnix Team The Collabnix Team is a diverse collective of Docker, Kubernetes, and IoT experts united by a passion for cloud-native technologies. With backgrounds spanning across DevOps, platform engineering, cloud architecture, and container orchestration, our contributors bring together decades of combined experience from various industries and technical domains.

Multi-Agent Orchestration: Patterns and Best Practices for 2024

Master multi-agent orchestration with proven patterns, code examples, and best practices. Learn orchestration frameworks, deployment strategies, and troubleshooting.
Collabnix Team
6 min read
Join our Discord Server
Index