Documentation Index
Fetch the complete documentation index at: https://mcp-server-langgraph.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Implement these security best practices to harden your MCP Server deployment for production. This guide covers configuration, deployment, and operational security.Security is an ongoing process, not a one-time setup. Regularly review and update your security posture.
Authentication Best Practices
Use Strong Authentication
Enforce MFA
Enforce MFA
Always require multi-factor authentication in production:Verify MFA in application:
# Keycloak configuration
keycloak:
realm: langgraph
mfa:
required: true
allowed_methods:
- totp # Authenticator app
- webauthn # Hardware keys
grace_period: 0 # Enforce immediately
@app.post("/message")
async def send_message(user: User = Depends(get_current_user)):
if not user.mfa_verified:
raise HTTPException(
status_code=403,
detail="MFA required for this operation"
)
# Process request
Strong Password Policy
Strong Password Policy
Configure password requirements:Password strength validation:
# Keycloak password policy
password_policy:
minimum_length: 12
require_uppercase: true
require_lowercase: true
require_numbers: true
require_special_chars: true
password_history: 10
max_age_days: 90
min_age_days: 1
not_username: true
not_email: true
import re
from zxcvbn import zxcvbn
def validate_password(password: str, user_inputs: list) -> bool:
# Check length
if len(password) < 12:
raise ValueError("Password must be at least 12 characters")
# Check complexity
if not re.search(r"[A-Z]", password):
raise ValueError("Password must contain uppercase letter")
if not re.search(r"[a-z]", password):
raise ValueError("Password must contain lowercase letter")
if not re.search(r"[0-9]", password):
raise ValueError("Password must contain number")
if not re.search(r"[!@#$%^&*]", password):
raise ValueError("Password must contain special character")
# Check strength
result = zxcvbn(password, user_inputs=user_inputs)
if result['score'] < 3:
raise ValueError("Password is too weak")
return True
Token Security
Token Security
Configure secure JWT tokens:
# Short-lived access tokens
ACCESS_TOKEN_EXPIRE_MINUTES = 60 # 1 hour
# Longer refresh tokens
REFRESH_TOKEN_EXPIRE_DAYS = 7
# Use RS256 (asymmetric)
ALGORITHM = "RS256"
# Rotate signing keys monthly
KEY_ROTATION_DAYS = 30
# JWT configuration
jwt_config = {
"algorithm": "RS256",
"access_token_expire": 3600,
"refresh_token_expire": 604800,
"issuer": "https://keycloak.yourdomain.com",
"audience": "mcp-server-langgraph",
"verify_signature": True,
"verify_exp": True,
"verify_aud": True,
"verify_iss": True
}
Session Management
Session Management
Secure session configuration:
# Session settings
SESSION_CONFIG = {
"ttl": 3600, # 1 hour
"sliding_window": True, # Extend on activity
"max_concurrent_sessions": 3, # Limit per user
"secure": True, # HTTPS only
"httponly": True, # No JS access
"samesite": "strict", # CSRF protection
"session_fixation_protection": True,
"regenerate_on_login": True
}
# Implement session timeout
async def check_session_activity(session_id: str):
session = await redis.get(f"session:{session_id}")
if not session:
raise SessionExpired()
last_activity = session.get("last_activity")
if datetime.utcnow() - last_activity > timedelta(hours=1):
await redis.delete(f"session:{session_id}")
raise SessionExpired()
Authorization Best Practices
Principle of Least Privilege
## Default deny, explicit allow
DEFAULT_PERMISSIONS = {
"user": ["viewer"], # Minimal by default
"admin": ["admin", "member", "viewer"]
}
## Grant minimum required permissions
async def onboard_user(user_id: str, role: str = "user"):
"""Add user with minimal permissions"""
# Default viewer access only
await openfga_client.write_tuples([{
"user": f"user:{user_id}",
"relation": "viewer",
"object": "organization:default"
}])
# Require explicit grant for elevated access
if role == "admin":
# Requires admin approval
await request_admin_approval(user_id)
Regular Permission Audits
#!/usr/bin/env python3
## audit-permissions.py
async def audit_permissions():
"""Monthly permission audit"""
# Find users with admin access
admins = await openfga_client.list_users(
relation="admin",
object="organization:default"
)
# Check if still needed
for admin in admins:
# Get user info
user = await get_user(admin)
# Check last activity
if user.last_login < datetime.utcnow() - timedelta(days=90):
logger.warning(f"Inactive admin: {admin}")
await notify_security_team(f"Remove admin access for {admin}?")
# Find overly permissive tuples
all_tuples = await openfga_client.read_tuples()
for tuple in all_tuples:
if tuple['relation'] == 'owner' and tuple['object'].startswith('organization:'):
# Multiple org owners is risky
owners = await openfga_client.list_users(
relation="owner",
object=tuple['object']
)
if len(owners) > 3:
logger.warning(f"Too many owners for {tuple['object']}: {owners}")
Resource Isolation
## Enforce organization boundaries
async def check_resource_access(
user_id: str,
resource_id: str,
action: str
):
"""Ensure users can only access resources in their org"""
# Get user's organizations
user_orgs = await openfga_client.list_objects(
user=f"user:{user_id}",
relation="member",
object_type="organization"
)
# Get resource's organization
resource_org = await get_resource_organization(resource_id)
# Check org membership
if resource_org not in user_orgs:
logger.security(
"cross_org_access_attempt",
user_id=user_id,
resource_id=resource_id,
user_orgs=user_orgs,
resource_org=resource_org
)
raise PermissionError("Cross-organization access denied")
# Check specific permission
allowed = await openfga_client.check_permission(
user=f"user:{user_id}",
relation=action,
object=resource_id
)
if not allowed:
raise PermissionError(f"Action '{action}' not allowed")
return True
Data Security Best Practices
Encryption
- At Rest
- In Transit
- Application-Level
PostgreSQL Encryption:Redis Encryption:Kubernetes PVC Encryption:
-- Enable transparent data encryption
ALTER SYSTEM SET ssl = on;
ALTER SYSTEM SET ssl_ciphers = 'HIGH:!aNULL:!MD5';
ALTER SYSTEM SET ssl_prefer_server_ciphers = on;
ALTER SYSTEM SET ssl_min_protocol_version = 'TLSv1.3';
-- Column-level encryption for sensitive data
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE users (
id UUID PRIMARY KEY,
email TEXT,
email_encrypted BYTEA DEFAULT pgp_sym_encrypt(email, current_setting('app.encryption_key'))
);
# redis.conf
tls-port 6380
tls-cert-file /etc/redis/certs/redis.crt
tls-key-file /etc/redis/certs/redis.key
tls-ca-cert-file /etc/redis/certs/ca.crt
tls-auth-clients yes
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-data
spec:
storageClassName: encrypted-gp3
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
TLS Configuration:Service Mesh (mTLS):
# Ingress with strong TLS
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: mcp-server-langgraph
annotations:
nginx.ingress.kubernetes.io/ssl-protocols: "TLSv1.3"
nginx.ingress.kubernetes.io/ssl-ciphers: "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256"
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
nginx.ingress.kubernetes.io/hsts: "true"
nginx.ingress.kubernetes.io/hsts-max-age: "31536000"
nginx.ingress.kubernetes.io/hsts-include-subdomains: "true"
spec:
tls:
- hosts:
- api.yourdomain.com
secretName: langgraph-tls
# Istio PeerAuthentication
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: mcp-server-langgraph
spec:
mtls:
mode: STRICT
Field-Level Encryption:
from cryptography.fernet import Fernet
from functools import lru_cache
@lru_cache()
def get_cipher():
key = settings.field_encryption_key.encode()
return Fernet(key)
class EncryptedModel(BaseModel):
"""Base model with encrypted fields"""
@classmethod
def encrypt_field(cls, value: str) -> str:
cipher = get_cipher()
return cipher.encrypt(value.encode()).decode()
@classmethod
def decrypt_field(cls, value: str) -> str:
cipher = get_cipher()
return cipher.decrypt(value.encode()).decode()
class User(EncryptedModel):
id: str
email: str
_email_encrypted: str
def __init__(self, **data):
if 'email' in data:
data['_email_encrypted'] = self.encrypt_field(data['email'])
super().__init__(**data)
@property
def email(self) -> str:
return self.decrypt_field(self._email_encrypted)
Data Minimization
## Only collect necessary data
class MessageRequest(BaseModel):
query: str # Required
# Don't collect: IP, user agent, etc. unless needed
## Automatic PII removal
def sanitize_logs(log_data: dict) -> dict:
"""Remove PII from logs"""
pii_fields = ['email', 'phone', 'ssn', 'credit_card']
sanitized = log_data.copy()
for field in pii_fields:
if field in sanitized:
sanitized[field] = '***REDACTED***'
return sanitized
## Data retention policies
async def enforce_retention_policy():
"""Delete old data according to policy"""
# Delete conversations older than 90 days
cutoff = datetime.utcnow() - timedelta(days=90)
await db.delete_conversations(older_than=cutoff)
# Anonymize audit logs older than 1 year
log_cutoff = datetime.utcnow() - timedelta(days=365)
await db.anonymize_audit_logs(older_than=log_cutoff)
Secure Data Deletion
async def secure_delete_user(user_id: str):
"""GDPR-compliant user deletion"""
# 1. Delete user data
await db.execute("DELETE FROM users WHERE id = %s", (user_id,))
await db.execute("DELETE FROM conversations WHERE user_id = %s", (user_id,))
await db.execute("DELETE FROM sessions WHERE user_id = %s", (user_id,))
# 2. Remove from authorization system
tuples = await openfga_client.read_tuples(user=f"user:{user_id}")
await openfga_client.delete_tuples(tuples)
# 3. Anonymize audit logs (keep for compliance)
await db.execute(
"UPDATE audit_logs SET user_id = 'deleted_user' WHERE user_id = %s",
(user_id,)
)
# 4. Clear caches
await redis.delete(f"user:{user_id}:*")
# 5. Log deletion
logger.audit("user_deleted", user_id=user_id, timestamp=datetime.utcnow())
# 6. Overwrite sensitive data (defense in depth)
# PostgreSQL VACUUM FULL reclaims space
await db.execute("VACUUM FULL users")
Network Security Best Practices
Network Policies
## Deny all by default
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: mcp-server-langgraph
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
---
## Allow only necessary traffic
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: mcp-server-langgraph-policy
namespace: mcp-server-langgraph
spec:
podSelector:
matchLabels:
app: mcp-server-langgraph
policyTypes:
- Ingress
- Egress
ingress:
# Allow from ingress controller
- from:
- namespaceSelector:
matchLabels:
name: ingress-nginx
ports:
- protocol: TCP
port: 8000
egress:
# Allow to PostgreSQL
- to:
- podSelector:
matchLabels:
app: postgres
ports:
- protocol: TCP
port: 5432
# Allow to Redis
- to:
- podSelector:
matchLabels:
app: redis
ports:
- protocol: TCP
port: 6379
# Allow to Keycloak
- to:
- podSelector:
matchLabels:
app: keycloak
ports:
- protocol: TCP
port: 8080
# Allow DNS
- to:
- namespaceSelector:
matchLabels:
name: kube-system
- podSelector:
matchLabels:
k8s-app: kube-dns
ports:
- protocol: UDP
port: 53
# Allow HTTPS to external services (LLM APIs)
- to:
- namespaceSelector: {}
ports:
- protocol: TCP
port: 443
Rate Limiting
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
## Initialize limiter
limiter = Limiter(
key_func=get_remote_address,
default_limits=["100/minute"],
storage_uri="redis://redis:6379"
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
## Per-endpoint limits
@app.post("/message")
@limiter.limit("30/minute") # More restrictive for expensive operations
async def send_message(request: Request):
pass
@app.post("/auth/login")
@limiter.limit("5/minute") # Prevent brute force
async def login(request: Request):
pass
## Per-user limits
@app.post("/message")
@limiter.limit("100/minute", key_func=lambda: get_current_user().id)
async def send_message(user: User = Depends(get_current_user)):
pass
DDoS Protection
## CloudFlare / AWS WAF rules
waf_rules:
- name: rate_limit_api
priority: 1
rule:
rate_limit:
limit: 2000
window: 300 # 5 minutes
action: block
- name: geo_blocking
priority: 2
rule:
geo_match:
block_countries:
- XX # Known malicious sources
- YY
- name: ip_reputation
priority: 3
rule:
ip_reputation:
block_high_risk: true
block_anonymous_proxies: true
block_tor_exit_nodes: true
- name: sql_injection_protection
priority: 4
rule:
sql_injection:
sensitivity: high
Container Security Best Practices
Secure Base Images
## Use minimal, verified base images
FROM python:3.12-slim-bookworm AS base
## Run as non-root
RUN groupadd -r appuser && useradd -r -g appuser appuser
## Install security updates
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
ca-certificates \
&& \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
## Copy application
WORKDIR /app
COPY --chown=appuser:appuser . .
## Install dependencies
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
RUN uv sync --frozen
## Switch to non-root user
USER appuser
## Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
## Run application
CMD ["python", "-m", "uvicorn", "mcp_server_langgraph.main:app", "--host", "0.0.0.0", "--port", "8000"]
Security Context
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server-langgraph
spec:
template:
spec:
# Pod security
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: agent
image: mcp-server-langgraph:latest
# Container security
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
capabilities:
drop:
- ALL
# Resource limits (prevent DoS)
resources:
requests:
cpu: 1000m
memory: 1Gi
limits:
cpu: 4000m
memory: 4Gi
# Read-only filesystem with specific writable paths
volumeMounts:
- name: tmp
mountPath: /tmp
- name: cache
mountPath: /.cache
volumes:
- name: tmp
emptyDir: {}
- name: cache
emptyDir: {}
Image Scanning
## .github/workflows/security.yml
name: Security Scan
on: [push, pull_request]
jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build image
run: docker build -t mcp-server-langgraph:${{ github.sha }} .
- name: Run Trivy scan
uses: aquasecurity/trivy-action@master
with:
image-ref: mcp-server-langgraph:${{ github.sha }}
format: 'sarif'
output: 'trivy-results.sarif'
severity: 'CRITICAL,HIGH'
- name: Upload results
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: 'trivy-results.sarif'
- name: Fail on critical vulnerabilities
uses: aquasecurity/trivy-action@master
with:
image-ref: mcp-server-langgraph:${{ github.sha }}
exit-code: '1'
severity: 'CRITICAL'
Secret Management Best Practices
Never Commit Secrets
## .gitignore
.env
.env.*
*.pem
*.key
credentials.json
secrets.yaml
config/secrets/
## Use git-secrets to prevent accidental commits
git secrets --install
git secrets --register-aws
git secrets --add 'sk-[a-zA-Z0-9]{48}' # OpenAI keys
git secrets --add 'xoxb-[0-9]{12}-[0-9]{12}-[a-zA-Z0-9]{24}' # Slack tokens
Use Secret Managers
## Use Infisical, not environment variables
from mcp_server_langgraph.core.config import settings
## BAD - hardcoded
API_KEY = "sk-1234567890abcdef"
## BAD - environment variable (better but not ideal)
API_KEY = os.getenv("API_KEY")
## GOOD - secret manager
API_KEY = settings.anthropic_api_key # Loaded from Infisical
Rotate Secrets Regularly
## Automated secret rotation
async def rotate_jwt_secret():
"""Rotate JWT signing key monthly"""
# Generate new key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096
)
# Store in Infisical
await infisical_client.create_secret(
secret_name=f"JWT_PRIVATE_KEY_{datetime.now().strftime('%Y%m')}",
secret_value=private_key.private_bytes(...).decode(),
project_id=settings.infisical_project_id,
environment="production"
)
# Update application config
await update_config("JWT_KEY_ID", f"key-{datetime.now().strftime('%Y%m')}")
# Keep old key for 24h for token validation
await schedule_key_deletion(old_key_id, delay_hours=24)
## Schedule rotation
schedule.every().month.do(rotate_jwt_secret)
Monitoring & Alerting
Security Metrics
## Failed authentication attempts
rate(auth_failed_attempts_total[5m]) > 10
## Permission denials
rate(authorization_denied_total[5m]) > 50
## Suspicious activity
sum(rate(http_requests_total{status=~"4..|5.."}[5m])) > 100
## Token validation failures
rate(jwt_validation_failed_total[1m]) > 5
Security Alerts
## prometheus-alerts.yaml
groups:
- name: security_alerts
rules:
- alert: BruteForceAttack
expr: rate(auth_failed_attempts_total[5m]) > 10
for: 2m
annotations:
summary: "Potential brute force attack detected"
description: "{{ $value }} failed login attempts per second"
- alert: UnauthorizedAccessAttempts
expr: rate(authorization_denied_total[5m]) > 50
for: 5m
annotations:
summary: "High rate of authorization failures"
- alert: SuspiciousAPIUsage
expr: sum(rate(http_requests_total{status="403"}[5m])) > 20
for: 5m
annotations:
summary: "Unusual number of forbidden requests"
- alert: TokenValidationFailures
expr: rate(jwt_validation_failed_total[1m]) > 5
for: 1m
annotations:
summary: "Token validation failures detected"
Compliance Checklist
GDPR
GDPR
- Data processing agreement
- Privacy policy published
- Consent management
- Right to access implemented
- Right to erasure implemented
- Data portability
- Breach notification procedure
- Data Protection Officer assigned
- DPIA completed
SOC 2
SOC 2
- Access controls documented
- Change management process
- Incident response plan
- Business continuity plan
- Vendor management
- Risk assessment
- Security awareness training
- Audit logging enabled
- Penetration testing
HIPAA
HIPAA
- BAA with vendors
- Encryption at rest
- Encryption in transit
- Access controls
- Audit trails
- Breach notification
- Risk analysis
- Security officer assigned
PCI DSS
PCI DSS
- Cardholder data isolated
- Strong cryptography
- Secure network architecture
- Access control measures
- Network monitoring
- Vulnerability management
- Security testing
- Information security policy
Security Testing
Penetration Testing
## OWASP ZAP automated scan
docker run -t owasp/zap2docker-stable zap-baseline.py \
-t https://api.yourdomain.com \
-r zap-report.html
## Manual testing checklist
## 1. Authentication bypass
## 2. Authorization flaws
## 3. SQL injection
## 4. XSS
## 5. CSRF
## 6. API abuse
## 7. Session management
## 8. Cryptography issues
Security Code Review
## .github/workflows/security-review.yml
name: Security Code Review
on: [pull_request]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Bandit security scan
run: |
uv tool install bandit
bandit -r src/ -f json -o bandit-report.json
- name: Safety dependency check
run: |
uv tool install safety
safety check --json --output safety-report.json
- name: Semgrep scan
run: |
uv tool install semgrep
semgrep --config=auto --json -o semgrep-report.json
Next Steps
Security Overview
Learn security architecture
Authentication
Set up authentication
Disaster Recovery
Backup and recovery
Production Checklist
Pre-deployment security
Security First: Follow these best practices to protect your MCP Server and user data!