How to Build Secure Remote MCP Servers Effectively
As AI agents become integral to enterprise workflows, the Model Context Protocol (MCP) layer has emerged as a critical infrastructure component. MCP servers act as secure bridges between Large Language Models and sensitive enterprise systems—from customer databases to financial platforms. This comprehensive guide covers everything you need to build, secure, and scale MCP servers for production environments.
Understanding MCP Security Threats
The Attack Surface
MCP servers present unique security challenges because they:
- Handle sensitive enterprise data across multiple systems
- Execute AI-driven actions with real business impact
- Operate in high-trust environments with elevated privileges
- Process dynamic requests from autonomous agents
Common Vulnerability Patterns
Confused Deputy Attacks: Malicious actors trick AI agents into performing unauthorized actions by manipulating context or instructions.
Token Relay Exploits: Stolen or reused authentication tokens gain access to resources across multiple MCP endpoints.
Session Hijacking: Compromised session data allows attackers to impersonate legitimate AI agents.
Data Exfiltration: Poorly scoped queries expose sensitive information across tenant boundaries.
Implementing OAuth 2.1 Authentication
Why OAuth 2.1 for MCP?
OAuth 2.1 provides standardized security patterns specifically designed for modern API architectures:
# MCP Server OAuth 2.1 Implementation
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer
import httpx
import jwt
from typing import Optional
app = FastAPI()
security = HTTPBearer()
class MCPOAuthConfig:
def __init__(self):
self.jwks_url = "https://your-auth-server.com/.well-known/jwks.json"
self.audience = "https://your-mcp-server.com"
self.issuer = "https://your-auth-server.com"
self.metadata_endpoint = "/.well-known/oauth-protected-resource"
oauth_config = MCPOAuthConfig()
@app.get("/.well-known/oauth-protected-resource")
async def oauth_metadata():
"""RFC 9728 - OAuth 2.0 Protected Resource Metadata"""
return {
"resource": oauth_config.audience,
"authorization_servers": [oauth_config.issuer],
"jwks_uri": oauth_config.jwks_url,
"scopes_supported": ["mcp:read", "mcp:write", "mcp:admin"],
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code", "client_credentials"]
}
Dynamic Client Registration
Enable automatic client registration for scaling AI agents:
@app.post("/oauth/register")
async def dynamic_client_registration(registration_request: dict):
"""RFC 7591 - Dynamic Client Registration"""
# Validate registration request
required_fields = ["client_name", "redirect_uris", "scope"]
if not all(field in registration_request for field in required_fields):
raise HTTPException(status_code=400, detail="Missing required fields")
# Generate client credentials
client_id = generate_client_id()
client_secret = generate_client_secret()
# Store client metadata
await store_client_metadata({
"client_id": client_id,
"client_secret": client_secret,
"client_name": registration_request["client_name"],
"redirect_uris": registration_request["redirect_uris"],
"scope": registration_request["scope"],
"created_at": datetime.utcnow()
})
return {
"client_id": client_id,
"client_secret": client_secret,
"registration_access_token": generate_registration_token(),
"expires_in": 3600
}
Advanced Token Validation Strategies
Robust JWT Validation
Implement comprehensive token validation with proper error handling:
import jwt
from jwt import PyJWKClient
from fastapi import HTTPException, status
class TokenValidator:
def __init__(self, jwks_url: str, audience: str, issuer: str):
self.jwks_client = PyJWKClient(jwks_url)
self.audience = audience
self.issuer = issuer
async def validate_token(self, token: str) -> dict:
try:
# Get signing key
signing_key = self.jwks_client.get_signing_key_from_jwt(token)
# Decode and validate
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience=self.audience,
issuer=self.issuer,
options={
"verify_signature": True,
"verify_exp": True,
"verify_aud": True,
"verify_iss": True
}
)
# Additional custom validations
if not self._validate_scopes(payload.get("scope", "")):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient scope"
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired",
headers={"WWW-Authenticate": f"Bearer realm=\"{self.audience}\""}
)
except jwt.InvalidAudienceError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid audience"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token validation failed: {str(e)}"
)
def _validate_scopes(self, token_scopes: str) -> bool:
required_scopes = {"mcp:read"} # Minimum required scope
token_scope_set = set(token_scopes.split())
return required_scopes.issubset(token_scope_set)
# Dependency for protected endpoints
async def get_current_user(token: str = Depends(security)):
validator = TokenValidator(
oauth_config.jwks_url,
oauth_config.audience,
oauth_config.issuer
)
return await validator.validate_token(token.credentials)
Resource-Bound Tokens (RFC 8707)
Implement resource indicators to prevent token reuse:
@app.middleware("http")
async def validate_resource_binding(request: Request, call_next):
if request.url.path.startswith("/api/"):
auth_header = request.headers.get("authorization")
if auth_header:
token = auth_header.replace("Bearer ", "")
payload = jwt.decode(token, options={"verify_signature": False})
# Verify resource binding
if payload.get("resource") != oauth_config.audience:
return JSONResponse(
status_code=403,
content={"error": "Token not bound to this resource"}
)
response = await call_next(request)
return response
Multi-Tenant Authorization Architecture
User Context Extraction
class UserContext:
def __init__(self, user_id: str, tenant_id: str, roles: list, scopes: list):
self.user_id = user_id
self.tenant_id = tenant_id
self.roles = roles
self.scopes = scopes
def has_permission(self, resource: str, action: str) -> bool:
"""Check if user has permission for specific resource action"""
required_scope = f"{resource}:{action}"
return required_scope in self.scopes or "admin" in self.roles
def extract_user_context(token_payload: dict) -> UserContext:
return UserContext(
user_id=token_payload["sub"],
tenant_id=token_payload.get("tenant_id"),
roles=token_payload.get("roles", []),
scopes=token_payload.get("scope", "").split()
)
# Authorization decorator
def require_permission(resource: str, action: str):
def decorator(func):
async def wrapper(*args, **kwargs):
user_context = kwargs.get("current_user")
if not user_context.has_permission(resource, action):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied for {resource}:{action}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
Data Isolation Patterns
class TenantAwareRepository:
def __init__(self, db_session):
self.db = db_session
async def get_documents(self, user_context: UserContext, filters: dict):
"""Always scope queries by tenant"""
query = self.db.query(Document).filter(
Document.tenant_id == user_context.tenant_id
)
# Apply additional user-level filters
if "manager" not in user_context.roles:
query = query.filter(Document.owner_id == user_context.user_id)
# Apply request filters
for key, value in filters.items():
if hasattr(Document, key):
query = query.filter(getattr(Document, key) == value)
return query.all()
@app.get("/api/documents")
@require_permission("documents", "read")
async def get_documents(
filters: dict = {},
current_user: UserContext = Depends(get_current_user)
):
repo = TenantAwareRepository(db_session)
documents = await repo.get_documents(current_user, filters)
return {"documents": documents}
Container Orchestration and Auto-Scaling
Kubernetes Deployment Configuration
# mcp-server-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server
labels:
app: mcp-server
spec:
replicas: 3
selector:
matchLabels:
app: mcp-server
template:
metadata:
labels:
app: mcp-server
spec:
containers:
- name: mcp-server
image: your-registry/mcp-server:latest
ports:
- containerPort: 8000
env:
- name: OAUTH_JWKS_URL
valueFrom:
configMapKeyRef:
name: mcp-config
key: jwks-url
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: mcp-secrets
key: database-url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: mcp-server-service
spec:
selector:
app: mcp-server
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
Horizontal Pod Autoscaler
# mcp-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-server-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
minReplicas: 3
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
Health Check Implementation
@app.get("/health")
async def health_check():
"""Kubernetes liveness probe endpoint"""
try:
# Check critical dependencies
await check_database_connection()
await check_oauth_provider()
await check_external_services()
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": os.getenv("APP_VERSION", "unknown")
}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Health check failed: {e}")
@app.get("/ready")
async def readiness_check():
"""Kubernetes readiness probe endpoint"""
try:
# Check if server is ready to accept traffic
await verify_oauth_configuration()
await check_cache_connectivity()
return {"status": "ready"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Not ready: {e}")
AI Gateway Implementation
Kong Gateway Configuration
# kong-gateway.yaml
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: mcp-rate-limiting
plugin: rate-limiting
config:
minute: 1000
hour: 10000
policy: redis
redis_host: redis-cluster
fault_tolerant: true
---
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: mcp-jwt-auth
plugin: jwt
config:
key_claim_name: kid
secret_is_base64: false
claims_to_verify:
- exp
- nbf
- aud
---
apiVersion: configuration.konghq.com/v1
kind: KongIngress
metadata:
name: mcp-ingress
spec:
upstream:
healthchecks:
active:
healthy:
successes: 3
unhealthy:
http_failures: 3
http_path: /health
interval: 10
proxy:
connect_timeout: 10000
read_timeout: 60000
write_timeout: 60000
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: mcp-server-ingress
annotations:
konghq.com/plugins: mcp-rate-limiting,mcp-jwt-auth
configuration.konghq.com/ingress.class: kong
spec:
rules:
- host: mcp-api.yourcompany.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: mcp-server-service
port:
number: 80
Custom Gateway Middleware
# gateway_middleware.py
from fastapi import Request, HTTPException
import time
import redis
from typing import Dict, Any
class AIGatewayMiddleware:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.rate_limits = {
"default": {"requests": 100, "window": 60},
"premium": {"requests": 1000, "window": 60}
}
async def __call__(self, request: Request, call_next):
start_time = time.time()
# Rate limiting
await self._check_rate_limit(request)
# Request validation
await self._validate_request(request)
# Process request
response = await call_next(request)
# Add security headers
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
# Log metrics
process_time = time.time() - start_time
await self._log_metrics(request, response, process_time)
return response
async def _check_rate_limit(self, request: Request):
client_id = self._extract_client_id(request)
tier = await self._get_client_tier(client_id)
limit_config = self.rate_limits.get(tier, self.rate_limits["default"])
key = f"rate_limit:{client_id}"
current = await self.redis.incr(key)
if current == 1:
await self.redis.expire(key, limit_config["window"])
if current > limit_config["requests"]:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={
"Retry-After": str(limit_config["window"]),
"X-RateLimit-Limit": str(limit_config["requests"]),
"X-RateLimit-Remaining": "0"
}
)
Enterprise Secrets Management
Azure Key Vault Integration
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
import asyncio
from typing import Dict, Optional
class SecretsManager:
def __init__(self, vault_url: str):
self.vault_url = vault_url
self.credential = DefaultAzureCredential()
self.client = SecretClient(vault_url=vault_url, credential=self.credential)
self._cache: Dict[str, Any] = {}
self._cache_ttl = 300 # 5 minutes
async def get_secret(self, secret_name: str) -> Optional[str]:
"""Get secret with caching and automatic refresh"""
cache_key = f"secret:{secret_name}"
# Check cache first
if cache_key in self._cache:
cached_data = self._cache[cache_key]
if time.time() - cached_data["timestamp"] < self._cache_ttl:
return cached_data["value"]
try:
# Fetch from Key Vault
secret = await asyncio.to_thread(
self.client.get_secret, secret_name
)
# Cache the result
self._cache[cache_key] = {
"value": secret.value,
"timestamp": time.time()
}
return secret.value
except Exception as e:
logger.error(f"Failed to retrieve secret {secret_name}: {e}")
# Return cached value if available, even if expired
if cache_key in self._cache:
return self._cache[cache_key]["value"]
raise
async def get_database_config(self) -> dict:
"""Get database configuration from secrets"""
return {
"host": await self.get_secret("db-host"),
"port": int(await self.get_secret("db-port")),
"database": await self.get_secret("db-name"),
"username": await self.get_secret("db-username"),
"password": await self.get_secret("db-password")
}
# Initialize secrets manager
secrets_manager = SecretsManager(
vault_url=os.getenv("AZURE_KEY_VAULT_URL")
)
@app.on_event("startup")
async def load_secrets():
"""Load critical secrets at startup"""
try:
global oauth_config
oauth_config.client_secret = await secrets_manager.get_secret("oauth-client-secret")
oauth_config.jwt_signing_key = await secrets_manager.get_secret("jwt-signing-key")
# Validate all required secrets are present
required_secrets = ["oauth-client-secret", "jwt-signing-key", "db-password"]
for secret_name in required_secrets:
value = await secrets_manager.get_secret(secret_name)
if not value:
raise ValueError(f"Required secret {secret_name} not found")
logger.info("All required secrets loaded successfully")
except Exception as e:
logger.error(f"Failed to load secrets: {e}")
raise
Kubernetes Secrets with CSI Driver
# secretstore.yaml
apiVersion: secrets-store.csi.x-k8s.io/v1
kind: SecretProviderClass
metadata:
name: mcp-secrets
spec:
provider: azure
parameters:
usePodIdentity: "false"
useVMManagedIdentity: "true"
userAssignedIdentityClientID: "your-identity-client-id"
keyvaultName: "your-keyvault-name"
tenantId: "your-tenant-id"
objects: |
array:
- |
objectName: oauth-client-secret
objectType: secret
- |
objectName: jwt-signing-key
objectType: secret
- |
objectName: db-password
objectType: secret
secretObjects:
- secretName: mcp-app-secrets
type: Opaque
data:
- objectName: oauth-client-secret
key: oauth-client-secret
- objectName: jwt-signing-key
key: jwt-signing-key
- objectName: db-password
key: db-password
Production Observability Stack
Structured Logging with Correlation IDs
import structlog
import uuid
from contextvars import ContextVar
from fastapi import Request
# Context variable for correlation ID
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')
class CorrelationIDMiddleware:
async def __call__(self, request: Request, call_next):
# Generate or extract correlation ID
correlation_id = request.headers.get('X-Correlation-ID') or str(uuid.uuid4())
correlation_id_var.set(correlation_id)
# Add to response headers
response = await call_next(request)
response.headers['X-Correlation-ID'] = correlation_id
return response
# Configure structured logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer() # Use JSONRenderer for production
],
wrapper_class=structlog.make_filtering_bound_logger(20), # INFO level
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
start_time = time.time()
# Log request
logger.info(
"Request started",
method=request.method,
url=str(request.url),
client_ip=request.client.host,
user_agent=request.headers.get("user-agent"),
correlation_id=correlation_id_var.get()
)
try:
response = await call_next(request)
# Log successful response
process_time = time.time() - start_time
logger.info(
"Request completed",
status_code=response.status_code,
process_time=process_time,
correlation_id=correlation_id_var.get()
)
return response
except Exception as e:
# Log error
process_time = time.time() - start_time
logger.error(
"Request failed",
error=str(e),
error_type=type(e).__name__,
process_time=process_time,
correlation_id=correlation_id_var.get()
)
raise
OpenTelemetry Distributed Tracing
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
# Configure tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name=os.getenv("JAEGER_AGENT_HOST", "localhost"),
agent_port=int(os.getenv("JAEGER_AGENT_PORT", "6831")),
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Auto-instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
RequestsInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument(engine=db_engine)
# Custom tracing for business logic
@app.get("/api/process-request")
async def process_request(request_data: dict, current_user: UserContext = Depends(get_current_user)):
with tracer.start_as_current_span("process_mcp_request") as span:
span.set_attribute("user.id", current_user.user_id)
span.set_attribute("tenant.id", current_user.tenant_id)
span.set_attribute("request.type", request_data.get("type", "unknown"))
try:
# Process request with nested spans
with tracer.start_as_current_span("validate_input"):
await validate_request_data(request_data)
with tracer.start_as_current_span("execute_business_logic"):
result = await execute_mcp_operation(request_data, current_user)
span.set_attribute("result.status", "success")
return result
except Exception as e:
span.set_attribute("result.status", "error")
span.set_attribute("error.message", str(e))
raise
Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
# Define metrics
REQUEST_COUNT = Counter(
'mcp_requests_total',
'Total number of MCP requests',
['method', 'endpoint', 'status_code', 'tenant_id']
)
REQUEST_DURATION = Histogram(
'mcp_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
ACTIVE_CONNECTIONS = Gauge(
'mcp_active_connections',
'Number of active connections'
)
TOKEN_VALIDATION_ERRORS = Counter(
'mcp_token_validation_errors_total',
'Number of token validation errors',
['error_type']
)
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
ACTIVE_CONNECTIONS.inc()
try:
response = await call_next(request)
# Record metrics
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status_code=response.status_code,
tenant_id=getattr(request.state, 'tenant_id', 'unknown')
).inc()
REQUEST_DURATION.labels(
method=request.method,
endpoint=request.url.path
).observe(time.time() - start_time)
return response
finally:
ACTIVE_CONNECTIONS.dec()
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint"""
return Response(generate_latest(), media_type="text/plain")
Performance Benchmarking
Load Testing with Locust
# locustfile.py
from locust import HttpUser, task, between
import random
import jwt
import time
class MCPUser(HttpUser):
wait_time = between(1, 5)
def on_start(self):
"""Generate test token"""
self.token = self.generate_test_token()
self.headers = {"Authorization": f"Bearer {self.token}"}
def generate_test_token(self):
payload = {
"sub": f"test-user-{random.randint(1, 1000)}",
"tenant_id": f"tenant-{random.randint(1, 10)}",
"scope": "mcp:read mcp:write",
"exp": int(time.time()) + 3600,
"aud": "https://your-mcp-server.com"
}
return jwt.encode(payload, "secret", algorithm="HS256")
@task(3)
def read_documents(self):
"""Simulate document read operations"""
response = self.client.get(
"/api/documents",
headers=self.headers,
params={"limit": 10}
)
if response.status_code == 200:
documents = response.json().get("documents", [])
if documents:
# Follow up with detail request
doc_id = random.choice(documents)["id"]
self.client.get(f"/api/documents/{doc_id}", headers=self.headers)
@task(1)
def create_document(self):
"""Simulate document creation"""
document_data = {
"title": f"Test Document {random.randint(1, 10000)}",
"content": "This is a test document for load testing",
"tags": ["test", "load-test"]
}
self.client.post(
"/api/documents",
json=document_data,
headers=self.headers
)
@task(2)
def search_documents(self):
"""Simulate search operations"""
search_terms = ["test", "document", "data", "report", "analysis"]
query = random.choice(search_terms)
self.client.get(
"/api/search",
headers=self.headers,
params={"q": query, "limit": 20}
)
# Run with: locust -f locustfile.py --host=https://your-mcp-server.com
Performance Monitoring Dashboard
# performance_monitor.py
import asyncio
import psutil
import time
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class PerformanceMetrics:
timestamp: float
cpu_percent: float
memory_percent: float
disk_io_read: int
disk_io_write: int
network_io_sent: int
network_io_recv: int
active_connections: int
response_time_p95: float
error_rate: float
class PerformanceMonitor:
def __init__(self):
self.metrics_history: List[PerformanceMetrics] = []
self.response_times: List[float] = []
self.error_count = 0
self.request_count = 0
async def collect_metrics(self):
"""Collect system and application metrics"""
while True:
try:
# System metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk_io = psutil.disk_io_counters()
network_io = psutil.net_io_counters()
# Application metrics
p95_response_time = self._calculate_p95_response_time()
error_rate = (self.error_count / max(self.request_count, 1)) * 100
metrics = PerformanceMetrics(
timestamp=time.time(),
cpu_percent=cpu_percent,
memory_percent=memory.percent,
disk_io_read=disk_io.read_bytes if disk_io else 0,
disk_io_write=disk_io.write_bytes if disk_io else 0,
network_io_sent=network_io.bytes_sent if network_io else 0,
network_io_recv=network_io.bytes_recv if network_io else 0,
active_connections=len(active_connections),
response_time_p95=p95_response_time,
error_rate=error_rate
)
self.metrics_history.append(metrics)
# Keep only last 1000 metrics
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
# Reset counters periodically
if len(self.response_times) > 10000:
self.response_times = self.response_times[-1000:]
self.error_count = int(self.error_count * 0.1)
self.request_count = int(self.request_count * 0.1)
await asyncio.sleep(10) # Collect every 10 seconds
except Exception as e:
logger.error(f"Error collecting metrics: {e}")
await asyncio.sleep(10)
def _calculate_p95_response_time(self) -> float:
if not self.response_times:
return 0.0
sorted_times = sorted(self.response_times)
index = int(0.95 * len(sorted_times))
return sorted_times[index] if index < len(sorted_times) else 0.0
def record_request(self, response_time: float, is_error: bool = False):
"""Record request metrics"""
self.response_times.append(response_time)
self.request_count += 1
if is_error:
self.error_count += 1
# Initialize performance monitor
perf_monitor = PerformanceMonitor()
@app.on_event("startup")
async def start_performance_monitoring():
asyncio.create_task(perf_monitor.collect_metrics())
@app.get("/api/performance/metrics")
async def get_performance_metrics():
"""Get current performance metrics"""
if not perf_monitor.metrics_history:
return {"message": "No metrics available yet"}
latest = perf_monitor.metrics_history[-1]
return {
"current": {
"cpu_percent": latest.cpu_percent,
"memory_percent": latest.memory_percent,
"response_time_p95": latest.response_time_p95,
"error_rate": latest.error_rate,
"active_connections": latest.active_connections
},
"history": perf_monitor.metrics_history[-100:] # Last 100 data points
}
Security Checklist
Pre-Production Security Audit
# security_audit.py
import asyncio
import ssl
import socket
import requests
from typing import List, Dict, Any
import subprocess
class SecurityAudit:
def __init__(self, base_url: str):
self.base_url = base_url
self.results: List[Dict[str, Any]] = []
async def run_full_audit(self) -> Dict[str, Any]:
"""Run comprehensive security audit"""
audit_results = {}
# SSL/TLS Configuration
audit_results["ssl_audit"] = await self._audit_ssl_configuration()
# HTTP Security Headers
audit_results["headers_audit"] = await self._audit_security_headers()
# Authentication Flows
audit_results["auth_audit"] = await self._audit_authentication()
# API Security
audit_results["api_audit"] = await self._audit_api_security()
# Dependencies
audit_results["dependencies_audit"] = await self._audit_dependencies()
return audit_results
async def _audit_ssl_configuration(self) -> Dict[str, Any]:
"""Audit SSL/TLS configuration"""
try:
hostname = self.base_url.replace("https://", "").replace("http://", "")
context = ssl.create_default_context()
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
cipher = ssock.cipher()
return {
"status": "pass",
"certificate_info": {
"subject": dict(x[0] for x in cert["subject"]),
"issuer": dict(x[0] for x in cert["issuer"]),
"version": cert["version"],
"expires": cert["notAfter"]
},
"cipher_suite": cipher,
"tls_version": ssock.version()
}
except Exception as e:
return {"status": "fail", "error": str(e)}
async def _audit_security_headers(self) -> Dict[str, Any]:
"""Audit HTTP security headers"""
try:
response = requests.get(self.base_url, timeout=10)
headers = response.headers
required_headers = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": ["DENY", "SAMEORIGIN"],
"X-XSS-Protection": "1; mode=block",
"Strict-Transport-Security": None, # Should be present
"Content-Security-Policy": None
}
audit_results = {"status": "pass", "headers": {}}
for header, expected_value in required_headers.items():
if header in headers:
actual_value = headers[header]
if expected_value is None:
audit_results["headers"][header] = {"status": "present", "value": actual_value}
elif isinstance(expected_value, list):
status = "pass" if actual_value in expected_value else "fail"
audit_results["headers"][header] = {"status": status, "value": actual_value}
else:
status = "pass" if actual_value == expected_value else "fail"
audit_results["headers"][header] = {"status": status, "value": actual_value}
else:
audit_results["headers"][header] = {"status": "missing"}
audit_results["status"] = "fail"
return audit_results
except Exception as e:
return {"status": "error", "error": str(e)}
async def _audit_authentication(self) -> Dict[str, Any]:
"""Audit authentication implementation"""
audit_results = {"tests": []}
# Test 1: Unauthenticated request should return 401
try:
response = requests.get(f"{self.base_url}/api/documents", timeout=10)
if response.status_code == 401:
audit_results["tests"].append({
"name": "Unauthenticated access blocked",
"status": "pass"
})
else:
audit_results["tests"].append({
"name": "Unauthenticated access blocked",
"status": "fail",
"details": f"Expected 401, got {response.status_code}"
})
except Exception as e:
audit_results["tests"].append({
"name": "Unauthenticated access blocked",
"status": "error",
"error": str(e)
})
# Test 2: Invalid token should return 401
try:
headers = {"Authorization": "Bearer invalid-token"}
response = requests.get(f"{self.base_url}/api/documents", headers=headers, timeout=10)
if response.status_code == 401:
audit_results["tests"].append({
"name": "Invalid token rejected",
"status": "pass"
})
else:
audit_results["tests"].append({
"name": "Invalid token rejected",
"status": "fail",
"details": f"Expected 401, got {response.status_code}"
})
except Exception as e:
audit_results["tests"].append({
"name": "Invalid token rejected",
"status": "error",
"error": str(e)
})
return audit_results
async def _audit_dependencies(self) -> Dict[str, Any]:
"""Audit dependencies for known vulnerabilities"""
try:
# Run safety check (requires safety package)
result = subprocess.run(
["safety", "check", "--json"],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return {"status": "pass", "vulnerabilities": []}
else:
vulnerabilities = result.stdout
return {"status": "fail", "vulnerabilities": vulnerabilities}
except subprocess.TimeoutExpired:
return {"status": "error", "error": "Safety check timed out"}
except FileNotFoundError:
return {"status": "skip", "error": "Safety tool not installed"}
except Exception as e:
return {"status": "error", "error": str(e)}
# Usage
async def run_security_audit():
audit = SecurityAudit("https://your-mcp-server.com")
results = await audit.run_full_audit()
print("Security Audit Results:")
print("=" * 50)
for category, result in results.items():
print(f"\n{category.upper()}:")
if result.get("status") == "pass":
print("✅ PASS")
elif result.get("status") == "fail":
print("❌ FAIL")
else:
print("⚠️ WARNING")
if "error" in result:
print(f" Error: {result['error']}")
Deployment Security Checklist
Create a comprehensive checklist for production deployments:
## MCP Server Security Checklist
### Authentication & Authorization
- [ ] OAuth 2.1 flow properly implemented
- [ ] JWT validation includes all required claims
- [ ] Token expiration properly enforced
- [ ] Resource binding (RFC 8707) implemented
- [ ] Multi-tenant isolation verified
- [ ] Role-based access control configured
- [ ] Dynamic client registration secured
### Network Security
- [ ] HTTPS enforced with valid TLS certificate
- [ ] Security headers properly configured
- [ ] CORS policy restrictive and appropriate
- [ ] Rate limiting configured
- [ ] API gateway deployed and configured
- [ ] Private networking for internal services
### Data Protection
- [ ] All database queries scoped by tenant
- [ ] Sensitive data encrypted at rest
- [ ] Audit logging for all data access
- [ ] Data retention policies implemented
- [ ] Backup encryption configured
### Infrastructure Security
- [ ] Container images scanned for vulnerabilities
- [ ] Kubernetes RBAC configured
- [ ] Network policies implemented
- [ ] Pod security standards enforced
- [ ] Secrets managed via secure store
- [ ] Regular security updates scheduled
### Monitoring & Alerting
- [ ] Structured logging implemented
- [ ] Distributed tracing configured
- [ ] Security event monitoring active
- [ ] Performance metrics collected
- [ ] Alert thresholds configured
- [ ] Incident response procedures documented
### Compliance & Governance
- [ ] Data classification performed
- [ ] Privacy impact assessment completed
- [ ] Security controls documented
- [ ] Regular security audits scheduled
- [ ] Incident response plan tested
Conclusion
Building secure and scalable MCP servers requires careful attention to authentication flows, multi-tenant isolation, container orchestration, and comprehensive observability. The patterns and implementations shown in this guide provide a solid foundation for production-ready MCP infrastructure.
Key takeaways for successful MCP server deployment:
Security First: Implement OAuth 2.1 flows, robust token validation, and multi-tenant isolation from day one. Security vulnerabilities in MCP servers can have cascading effects across AI-driven workflows.
Observability is Critical: Comprehensive logging, tracing, and metrics enable rapid issue detection and resolution. The correlation ID pattern ensures end-to-end request tracking across distributed systems.
Plan for Scale: Container orchestration with Kubernetes, AI gateways for cross-cutting concerns, and horizontal pod autoscaling ensure your MCP infrastructure can handle growing AI workloads.
Automate Security: Dynamic secrets management, automated security audits, and infrastructure-as-code practices reduce the risk of configuration drift and human error.
By following these architectural patterns and implementation strategies, your MCP servers will be ready to securely serve AI agents at enterprise scale. Remember to regularly review and update your security posture as the MCP ecosystem continues to evolve.
Have questions about implementing these patterns? Share your experiences and challenges in the comments below. For more AI infrastructure guides, subscribe to our newsletter.