Overview
Implement these security best practices to harden your MCP Server deployment for production. This guide covers configuration, deployment, and operational security.Security is an ongoing process, not a one-time setup. Regularly review and update your security posture.
Authentication Best Practices
Use Strong Authentication
Enforce MFA
Enforce MFA
Always require multi-factor authentication in production:Verify MFA in application:
Copy
Ask AI
# Keycloak configuration
keycloak:
realm: langgraph
mfa:
required: true
allowed_methods:
- totp # Authenticator app
- webauthn # Hardware keys
grace_period: 0 # Enforce immediately
Copy
Ask AI
@app.post("/message")
async def send_message(user: User = Depends(get_current_user)):
if not user.mfa_verified:
raise HTTPException(
status_code=403,
detail="MFA required for this operation"
)
# Process request
Strong Password Policy
Strong Password Policy
Configure password requirements:Password strength validation:
Copy
Ask AI
# Keycloak password policy
password_policy:
minimum_length: 12
require_uppercase: true
require_lowercase: true
require_numbers: true
require_special_chars: true
password_history: 10
max_age_days: 90
min_age_days: 1
not_username: true
not_email: true
Copy
Ask AI
import re
from zxcvbn import zxcvbn
def validate_password(password: str, user_inputs: list) -> bool:
# Check length
if len(password) < 12:
raise ValueError("Password must be at least 12 characters")
# Check complexity
if not re.search(r"[A-Z]", password):
raise ValueError("Password must contain uppercase letter")
if not re.search(r"[a-z]", password):
raise ValueError("Password must contain lowercase letter")
if not re.search(r"[0-9]", password):
raise ValueError("Password must contain number")
if not re.search(r"[!@#$%^&*]", password):
raise ValueError("Password must contain special character")
# Check strength
result = zxcvbn(password, user_inputs=user_inputs)
if result['score'] < 3:
raise ValueError("Password is too weak")
return True
Token Security
Token Security
Configure secure JWT tokens:
Copy
Ask AI
# Short-lived access tokens
ACCESS_TOKEN_EXPIRE_MINUTES = 60 # 1 hour
# Longer refresh tokens
REFRESH_TOKEN_EXPIRE_DAYS = 7
# Use RS256 (asymmetric)
ALGORITHM = "RS256"
# Rotate signing keys monthly
KEY_ROTATION_DAYS = 30
# JWT configuration
jwt_config = {
"algorithm": "RS256",
"access_token_expire": 3600,
"refresh_token_expire": 604800,
"issuer": "https://keycloak.yourdomain.com",
"audience": "mcp-server-langgraph",
"verify_signature": True,
"verify_exp": True,
"verify_aud": True,
"verify_iss": True
}
Session Management
Session Management
Secure session configuration:
Copy
Ask AI
# Session settings
SESSION_CONFIG = {
"ttl": 3600, # 1 hour
"sliding_window": True, # Extend on activity
"max_concurrent_sessions": 3, # Limit per user
"secure": True, # HTTPS only
"httponly": True, # No JS access
"samesite": "strict", # CSRF protection
"session_fixation_protection": True,
"regenerate_on_login": True
}
# Implement session timeout
async def check_session_activity(session_id: str):
session = await redis.get(f"session:{session_id}")
if not session:
raise SessionExpired()
last_activity = session.get("last_activity")
if datetime.utcnow() - last_activity > timedelta(hours=1):
await redis.delete(f"session:{session_id}")
raise SessionExpired()
Authorization Best Practices
Principle of Least Privilege
Copy
Ask AI
## Default deny, explicit allow
DEFAULT_PERMISSIONS = {
"user": ["viewer"], # Minimal by default
"admin": ["admin", "member", "viewer"]
}
## Grant minimum required permissions
async def onboard_user(user_id: str, role: str = "user"):
"""Add user with minimal permissions"""
# Default viewer access only
await openfga_client.write_tuples([{
"user": f"user:{user_id}",
"relation": "viewer",
"object": "organization:default"
}])
# Require explicit grant for elevated access
if role == "admin":
# Requires admin approval
await request_admin_approval(user_id)
Regular Permission Audits
Copy
Ask AI
#!/usr/bin/env python3
## audit-permissions.py
async def audit_permissions():
"""Monthly permission audit"""
# Find users with admin access
admins = await openfga_client.list_users(
relation="admin",
object="organization:default"
)
# Check if still needed
for admin in admins:
# Get user info
user = await get_user(admin)
# Check last activity
if user.last_login < datetime.utcnow() - timedelta(days=90):
logger.warning(f"Inactive admin: {admin}")
await notify_security_team(f"Remove admin access for {admin}?")
# Find overly permissive tuples
all_tuples = await openfga_client.read_tuples()
for tuple in all_tuples:
if tuple['relation'] == 'owner' and tuple['object'].startswith('organization:'):
# Multiple org owners is risky
owners = await openfga_client.list_users(
relation="owner",
object=tuple['object']
)
if len(owners) > 3:
logger.warning(f"Too many owners for {tuple['object']}: {owners}")
Resource Isolation
Copy
Ask AI
## Enforce organization boundaries
async def check_resource_access(
user_id: str,
resource_id: str,
action: str
):
"""Ensure users can only access resources in their org"""
# Get user's organizations
user_orgs = await openfga_client.list_objects(
user=f"user:{user_id}",
relation="member",
object_type="organization"
)
# Get resource's organization
resource_org = await get_resource_organization(resource_id)
# Check org membership
if resource_org not in user_orgs:
logger.security(
"cross_org_access_attempt",
user_id=user_id,
resource_id=resource_id,
user_orgs=user_orgs,
resource_org=resource_org
)
raise PermissionError("Cross-organization access denied")
# Check specific permission
allowed = await openfga_client.check_permission(
user=f"user:{user_id}",
relation=action,
object=resource_id
)
if not allowed:
raise PermissionError(f"Action '{action}' not allowed")
return True
Data Security Best Practices
Encryption
- At Rest
- In Transit
- Application-Level
PostgreSQL Encryption:Redis Encryption:Kubernetes PVC Encryption:
Copy
Ask AI
-- Enable transparent data encryption
ALTER SYSTEM SET ssl = on;
ALTER SYSTEM SET ssl_ciphers = 'HIGH:!aNULL:!MD5';
ALTER SYSTEM SET ssl_prefer_server_ciphers = on;
ALTER SYSTEM SET ssl_min_protocol_version = 'TLSv1.3';
-- Column-level encryption for sensitive data
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE users (
id UUID PRIMARY KEY,
email TEXT,
email_encrypted BYTEA DEFAULT pgp_sym_encrypt(email, current_setting('app.encryption_key'))
);
Copy
Ask AI
# redis.conf
tls-port 6380
tls-cert-file /etc/redis/certs/redis.crt
tls-key-file /etc/redis/certs/redis.key
tls-ca-cert-file /etc/redis/certs/ca.crt
tls-auth-clients yes
Copy
Ask AI
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-data
spec:
storageClassName: encrypted-gp3
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
Data Minimization
Copy
Ask AI
## Only collect necessary data
class MessageRequest(BaseModel):
query: str # Required
# Don't collect: IP, user agent, etc. unless needed
## Automatic PII removal
def sanitize_logs(log_data: dict) -> dict:
"""Remove PII from logs"""
pii_fields = ['email', 'phone', 'ssn', 'credit_card']
sanitized = log_data.copy()
for field in pii_fields:
if field in sanitized:
sanitized[field] = '***REDACTED***'
return sanitized
## Data retention policies
async def enforce_retention_policy():
"""Delete old data according to policy"""
# Delete conversations older than 90 days
cutoff = datetime.utcnow() - timedelta(days=90)
await db.delete_conversations(older_than=cutoff)
# Anonymize audit logs older than 1 year
log_cutoff = datetime.utcnow() - timedelta(days=365)
await db.anonymize_audit_logs(older_than=log_cutoff)
Secure Data Deletion
Copy
Ask AI
async def secure_delete_user(user_id: str):
"""GDPR-compliant user deletion"""
# 1. Delete user data
await db.execute("DELETE FROM users WHERE id = %s", (user_id,))
await db.execute("DELETE FROM conversations WHERE user_id = %s", (user_id,))
await db.execute("DELETE FROM sessions WHERE user_id = %s", (user_id,))
# 2. Remove from authorization system
tuples = await openfga_client.read_tuples(user=f"user:{user_id}")
await openfga_client.delete_tuples(tuples)
# 3. Anonymize audit logs (keep for compliance)
await db.execute(
"UPDATE audit_logs SET user_id = 'deleted_user' WHERE user_id = %s",
(user_id,)
)
# 4. Clear caches
await redis.delete(f"user:{user_id}:*")
# 5. Log deletion
logger.audit("user_deleted", user_id=user_id, timestamp=datetime.utcnow())
# 6. Overwrite sensitive data (defense in depth)
# PostgreSQL VACUUM FULL reclaims space
await db.execute("VACUUM FULL users")
Network Security Best Practices
Network Policies
Copy
Ask AI
## Deny all by default
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: mcp-server-langgraph
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
---
## Allow only necessary traffic
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: mcp-server-langgraph-policy
namespace: mcp-server-langgraph
spec:
podSelector:
matchLabels:
app: mcp-server-langgraph
policyTypes:
- Ingress
- Egress
ingress:
# Allow from ingress controller
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8000
egress:
# Allow to PostgreSQL
- to:
- podSelector:
matchLabels:
app: postgres
ports:
- protocol: TCP
port: 5432
# Allow to Redis
- to:
- podSelector:
matchLabels:
app: redis
ports:
- protocol: TCP
port: 6379
# Allow to Keycloak
- to:
- podSelector:
matchLabels:
app: keycloak
ports:
- protocol: TCP
port: 8080
# Allow DNS
- to:
- namespaceSelector:
matchLabels:
name: kube-system
- podSelector:
matchLabels:
k8s-app: kube-dns
ports:
- protocol: UDP
port: 53
# Allow HTTPS to external services (LLM APIs)
- to:
- namespaceSelector: {}
ports:
- protocol: TCP
port: 443
Rate Limiting
Copy
Ask AI
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
## Initialize limiter
limiter = Limiter(
key_func=get_remote_address,
default_limits=["100/minute"],
storage_uri="redis://redis:6379"
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
## Per-endpoint limits
@app.post("/message")
@limiter.limit("30/minute") # More restrictive for expensive operations
async def send_message(request: Request):
pass
@app.post("/auth/login")
@limiter.limit("5/minute") # Prevent brute force
async def login(request: Request):
pass
## Per-user limits
@app.post("/message")
@limiter.limit("100/minute", key_func=lambda: get_current_user().id)
async def send_message(user: User = Depends(get_current_user)):
pass
DDoS Protection
Copy
Ask AI
## CloudFlare / AWS WAF rules
waf_rules:
- name: rate_limit_api
priority: 1
rule:
rate_limit:
limit: 2000
window: 300 # 5 minutes
action: block
- name: geo_blocking
priority: 2
rule:
geo_match:
block_countries:
- XX # Known malicious sources
- YY
- name: ip_reputation
priority: 3
rule:
ip_reputation:
block_high_risk: true
block_anonymous_proxies: true
block_tor_exit_nodes: true
- name: sql_injection_protection
priority: 4
rule:
sql_injection:
sensitivity: high
Container Security Best Practices
Secure Base Images
Copy
Ask AI
## Use minimal, verified base images
FROM python:3.12-slim-bookworm AS base
## Run as non-root
RUN groupadd -r appuser && useradd -r -g appuser appuser
## Install security updates
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
ca-certificates \
&& \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
## Copy application
WORKDIR /app
COPY --chown=appuser:appuser . .
## Install dependencies
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
RUN uv sync --frozen
## Switch to non-root user
USER appuser
## Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
## Run application
CMD ["python", "-m", "uvicorn", "mcp_server_langgraph.main:app", "--host", "0.0.0.0", "--port", "8000"]
Security Context
Copy
Ask AI
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server-langgraph
spec:
template:
spec:
# Pod security
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: agent
image: mcp-server-langgraph:latest
# Container security
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
capabilities:
drop:
- ALL
# Resource limits (prevent DoS)
resources:
requests:
cpu: 1000m
memory: 1Gi
limits:
cpu: 4000m
memory: 4Gi
# Read-only filesystem with specific writable paths
volumeMounts:
- name: tmp
mountPath: /tmp
- name: cache
mountPath: /.cache
volumes:
- name: tmp
emptyDir: {}
- name: cache
emptyDir: {}
Image Scanning
Copy
Ask AI
## .github/workflows/security.yml
name: Security Scan
on: [push, pull_request]
jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build image
run: docker build -t mcp-server-langgraph:${{ github.sha }} .
- name: Run Trivy scan
uses: aquasecurity/trivy-action@master
with:
image-ref: mcp-server-langgraph:${{ github.sha }}
format: 'sarif'
output: 'trivy-results.sarif'
severity: 'CRITICAL,HIGH'
- name: Upload results
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: 'trivy-results.sarif'
- name: Fail on critical vulnerabilities
uses: aquasecurity/trivy-action@master
with:
image-ref: mcp-server-langgraph:${{ github.sha }}
exit-code: '1'
severity: 'CRITICAL'
Secret Management Best Practices
Never Commit Secrets
Copy
Ask AI
## .gitignore
.env
.env.*
*.pem
*.key
credentials.json
secrets.yaml
config/secrets/
## Use git-secrets to prevent accidental commits
git secrets --install
git secrets --register-aws
git secrets --add 'sk-[a-zA-Z0-9]{48}' # OpenAI keys
git secrets --add 'xoxb-[0-9]{12}-[0-9]{12}-[a-zA-Z0-9]{24}' # Slack tokens
Use Secret Managers
Copy
Ask AI
## Use Infisical, not environment variables
from mcp_server_langgraph.core.config import settings
## BAD - hardcoded
API_KEY = "sk-1234567890abcdef"
## BAD - environment variable (better but not ideal)
API_KEY = os.getenv("API_KEY")
## GOOD - secret manager
API_KEY = settings.anthropic_api_key # Loaded from Infisical
Rotate Secrets Regularly
Copy
Ask AI
## Automated secret rotation
async def rotate_jwt_secret():
"""Rotate JWT signing key monthly"""
# Generate new key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096
)
# Store in Infisical
await infisical_client.create_secret(
secret_name=f"JWT_PRIVATE_KEY_{datetime.now().strftime('%Y%m')}",
secret_value=private_key.private_bytes(...).decode(),
project_id=settings.infisical_project_id,
environment="production"
)
# Update application config
await update_config("JWT_KEY_ID", f"key-{datetime.now().strftime('%Y%m')}")
# Keep old key for 24h for token validation
await schedule_key_deletion(old_key_id, delay_hours=24)
## Schedule rotation
schedule.every().month.do(rotate_jwt_secret)
Monitoring & Alerting
Security Metrics
Copy
Ask AI
## Failed authentication attempts
rate(auth_failed_attempts_total[5m]) > 10
## Permission denials
rate(authorization_denied_total[5m]) > 50
## Suspicious activity
sum(rate(http_requests_total{status=~"4..|5.."}[5m])) > 100
## Token validation failures
rate(jwt_validation_failed_total[1m]) > 5
Security Alerts
Copy
Ask AI
## prometheus-alerts.yaml
groups:
- name: security_alerts
rules:
- alert: BruteForceAttack
expr: rate(auth_failed_attempts_total[5m]) > 10
for: 2m
annotations:
summary: "Potential brute force attack detected"
description: "{{ $value }} failed login attempts per second"
- alert: UnauthorizedAccessAttempts
expr: rate(authorization_denied_total[5m]) > 50
for: 5m
annotations:
summary: "High rate of authorization failures"
- alert: SuspiciousAPIUsage
expr: sum(rate(http_requests_total{status="403"}[5m])) > 20
for: 5m
annotations:
summary: "Unusual number of forbidden requests"
- alert: TokenValidationFailures
expr: rate(jwt_validation_failed_total[1m]) > 5
for: 1m
annotations:
summary: "Token validation failures detected"
Compliance Checklist
GDPR
GDPR
- Data processing agreement
- Privacy policy published
- Consent management
- Right to access implemented
- Right to erasure implemented
- Data portability
- Breach notification procedure
- Data Protection Officer assigned
- DPIA completed
SOC 2
SOC 2
- Access controls documented
- Change management process
- Incident response plan
- Business continuity plan
- Vendor management
- Risk assessment
- Security awareness training
- Audit logging enabled
- Penetration testing
HIPAA
HIPAA
- BAA with vendors
- Encryption at rest
- Encryption in transit
- Access controls
- Audit trails
- Breach notification
- Risk analysis
- Security officer assigned
PCI DSS
PCI DSS
- Cardholder data isolated
- Strong cryptography
- Secure network architecture
- Access control measures
- Network monitoring
- Vulnerability management
- Security testing
- Information security policy
Security Testing
Penetration Testing
Copy
Ask AI
## OWASP ZAP automated scan
docker run -t owasp/zap2docker-stable zap-baseline.py \
-t https://api.yourdomain.com \
-r zap-report.html
## Manual testing checklist
## 1. Authentication bypass
## 2. Authorization flaws
## 3. SQL injection
## 4. XSS
## 5. CSRF
## 6. API abuse
## 7. Session management
## 8. Cryptography issues
Security Code Review
Copy
Ask AI
## .github/workflows/security-review.yml
name: Security Code Review
on: [pull_request]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Bandit security scan
run: |
uv tool install bandit
bandit -r src/ -f json -o bandit-report.json
- name: Safety dependency check
run: |
uv tool install safety
safety check --json --output safety-report.json
- name: Semgrep scan
run: |
uv tool install semgrep
semgrep --config=auto --json -o semgrep-report.json
Next Steps
Security Overview
Learn security architecture
Authentication
Set up authentication
Disaster Recovery
Backup and recovery
Production Checklist
Pre-deployment security
Security First: Follow these best practices to protect your MCP Server and user data!