Overview
The MCP Server with LangGraph implements defense-in-depth security with multiple layers of protection. This guide covers the security architecture, threat model, and mitigation strategies.
Security is everyone’s responsibility . Follow all security best practices and stay informed about vulnerabilities.
Security Architecture
Defense-in-Depth Layers
Layer 1: Network
Layer 2: Authentication
Layer 3: Authorization
Layer 4: Application
Layer 5: Data
Network Security
TLS 1.3 encryption in transit
mTLS between services
Network policies (Kubernetes)
VPC isolation
DDoS protection
WAF rules
# Network Policy
apiVersion : networking.k8s.io/v1
kind : NetworkPolicy
metadata :
name : mcp-server-langgraph
spec :
podSelector :
matchLabels :
app : mcp-server-langgraph
policyTypes :
- Ingress
- Egress
ingress :
- from :
- podSelector :
matchLabels :
app : nginx-ingress
ports :
- protocol : TCP
port : 8000
egress :
- to :
- podSelector :
matchLabels :
app : postgres
ports :
- protocol : TCP
port : 5432
Threat Model
STRIDE Analysis
Spoofing Threats :
Fake authentication tokens
Session hijacking
Identity theft
Mitigations :
JWT signature verification
JWKS rotation
Secure session cookies
MFA enforcement
Tampering Threats :
Request manipulation
Token modification
Database injection
Mitigations :
Input validation
Parameterized queries
JWT signature checks
Integrity checks
Repudiation Threats :
Denied actions
Missing audit trail
Log tampering
Mitigations :
Comprehensive logging
Immutable audit logs
Digital signatures
Centralized logging
Information Disclosure Threats :
Data leaks
Credential exposure
PII disclosure
Mitigations :
Data encryption
Secret management
Access controls
Data masking
Denial of Service Threats :
Resource exhaustion
DDoS attacks
API flooding
Mitigations :
Rate limiting
Auto-scaling
WAF rules
Circuit breakers
Elevation of Privilege Threats :
Privilege escalation
Role manipulation
Authorization bypass
Mitigations :
Fine-grained permissions
Least privilege principle
Regular audits
Permission boundaries
Authentication Security
JWT Security
Token Structure :
{
"header" : {
"alg" : "RS256" ,
"typ" : "JWT" ,
"kid" : "key-2025-10"
},
"payload" : {
"sub" : "user-123" ,
"iss" : "https://keycloak.yourdomain.com/realms/langgraph" ,
"aud" : "mcp-server-langgraph" ,
"exp" : 1728737200 ,
"iat" : 1728733600 ,
"roles" : [ "user" , "chat-executor" ],
"session_id" : "sess_abc123"
}
}
Security Measures :
RS256 algorithm (asymmetric)
Short expiry (1 hour)
Audience validation
Issuer validation
Key rotation (monthly)
Session Security
## Secure session configuration
SESSION_COOKIE_SECURE = T rue # HTTPS only
SESSION_COOKIE_HTTPONLY = T rue # No JavaScript access
SESSION_COOKIE_SAMESITE = "Strict" # CSRF protection
SESSION_TTL = 3600 # 1 hour
SESSION_SLIDING_WINDOW = T rue # Extend on activity
SESSION_MAX_CONCURRENT = 3 # Limit sessions per user
Session Fixation Protection :
## Regenerate session ID on login
async def login ( username : str , password : str ):
user = await authenticate(username, password)
# Delete old sessions
await session_mgr.delete_user_sessions(user.id)
# Create new session
session = await session_mgr.create_session(
user_id = user.id,
metadata = { "login_time" : datetime.utcnow()}
)
return session
Authorization Security
Principle of Least Privilege
Default Deny :
## Deny by default, explicit allow
async def check_access ( user_id : str , resource : str , action : str ):
# Check explicit permission
allowed = await openfga_client.check_permission(
user = f "user: { user_id } " ,
relation = action,
object = resource
)
if not allowed:
logger.warning( f "Access denied: { user_id } -> { action } { resource } " )
raise PermissionError ( "Access denied" )
return True
Role Hierarchies :
// OpenFGA model
type organization
relations
define admin: [user]
define member: [user] or admin // Admins are members
define viewer: [user] or member // Members are viewers
Permission Boundaries
## Enforce organization boundaries
async def check_org_access ( user_id : str , resource_id : str ):
# Get user's organization
user_orgs = await openfga_client.list_objects(
user = f "user: { user_id } " ,
relation = "member" ,
object_type = "organization"
)
# Get resource's organization
resource_org = await get_resource_org(resource_id)
# Verify match
if resource_org not in user_orgs:
raise PermissionError ( "Cross-organization access denied" )
Data Security
Encryption at Rest
Database Encryption :
## PostgreSQL with encryption
apiVersion : v1
kind : PersistentVolumeClaim
metadata :
name : postgres-data
spec :
storageClassName : encrypted-gp3
accessModes :
- ReadWriteOnce
resources :
requests :
storage : 100Gi
Application-Level Encryption :
from cryptography.fernet import Fernet
class EncryptedField :
"""Encrypt sensitive fields before storing"""
def __init__ ( self , key : bytes ):
self .cipher = Fernet(key)
def encrypt ( self , value : str ) -> str :
return self .cipher.encrypt(value.encode()).decode()
def decrypt ( self , value : str ) -> str :
return self .cipher.decrypt(value.encode()).decode()
## Usage
cipher = EncryptedField(settings.encryption_key)
user.email = cipher.encrypt( "alice@example.com" )
Encryption in Transit
TLS Configuration :
## Ingress with TLS
apiVersion : networking.k8s.io/v1
kind : Ingress
metadata :
name : mcp-server-langgraph
annotations :
cert-manager.io/cluster-issuer : letsencrypt-prod
nginx.ingress.kubernetes.io/ssl-redirect : "true"
nginx.ingress.kubernetes.io/force-ssl-redirect : "true"
spec :
tls :
- hosts :
- api.yourdomain.com
secretName : langgraph-tls
rules :
- host : api.yourdomain.com
http :
paths :
- path : /
pathType : Prefix
backend :
service :
name : mcp-server-langgraph
port :
number : 8000
mTLS Between Services :
## Service mesh (Istio)
apiVersion : security.istio.io/v1beta1
kind : PeerAuthentication
metadata :
name : default
spec :
mtls :
mode : STRICT
Secret Management
Infisical Integration :
from mcp_server_langgraph.core.config import settings
## Secrets automatically loaded from Infisical
api_key = settings.anthropic_api_key # Never hardcoded
db_password = settings.redis_password # Never in Git
Kubernetes Secrets :
## External Secrets Operator
apiVersion : external-secrets.io/v1beta1
kind : ExternalSecret
metadata :
name : langgraph-secrets
spec :
refreshInterval : 1h
secretStoreRef :
name : infisical
kind : SecretStore
target :
name : mcp-server-langgraph-secrets
data :
- secretKey : ANTHROPIC_API_KEY
remoteRef :
key : ANTHROPIC_API_KEY
Application Security
from pydantic import BaseModel, validator, constr
from typing import Optional
class MessageRequest ( BaseModel ):
query: constr( min_length = 1 , max_length = 10000 )
conversation_id: Optional[ str ] = None
@validator ( 'query' )
def sanitize_query ( cls , v ):
# Remove potential injection attempts
if any (char in v for char in [ '<' , '>' , ';' , '--' ]):
raise ValueError ( "Invalid characters in query" )
return v.strip()
@validator ( 'conversation_id' )
def validate_conversation_id ( cls , v ):
if v and not v.startswith( 'conv_' ):
raise ValueError ( "Invalid conversation ID format" )
return v
SQL Injection Prevention
## NEVER do this
query = f "SELECT * FROM users WHERE username = '{username}'" # VULNERABLE
## Always use parameterized queries
query = "SELECT * FROM users WHERE username = %s"
cursor . execute (query, (username,)) # SAFE
XSS Prevention
from html import escape
def sanitize_output ( text : str ) -> str :
"""Escape HTML to prevent XSS"""
return escape(text)
## Output encoding
response = {
"message" : sanitize_output(user_input)
}
CSRF Protection
## CSRF token validation
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer
security = HTTPBearer()
async def verify_csrf (
token : str = Depends(security),
session : Session = Depends(get_session)
):
if not session.csrf_token:
raise HTTPException( 403 , "CSRF token missing" )
if token != session.csrf_token:
raise HTTPException( 403 , "CSRF token invalid" )
Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter( key_func = get_remote_address)
@app.post ( "/message" )
@limiter.limit ( "100/minute" )
async def send_message ( request : Request):
# Process message
pass
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app = FastAPI()
## CORS
app.add_middleware(
CORSMiddleware,
allow_origins = [ "https://yourdomain.com" ],
allow_credentials = True ,
allow_methods = [ "GET" , "POST" ],
allow_headers = [ "Authorization" , "Content-Type" ],
)
## Trusted hosts
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts = [ "yourdomain.com" , "*.yourdomain.com" ]
)
## Security headers
@app.middleware ( "http" )
async def add_security_headers ( request , call_next ):
response = await call_next(request)
response.headers[ "X-Content-Type-Options" ] = "nosniff"
response.headers[ "X-Frame-Options" ] = "DENY"
response.headers[ "X-XSS-Protection" ] = "1; mode=block"
response.headers[ "Strict-Transport-Security" ] = "max-age=31536000; includeSubDomains"
response.headers[ "Content-Security-Policy" ] = "default-src 'self'"
response.headers[ "Referrer-Policy" ] = "strict-origin-when-cross-origin"
response.headers[ "Permissions-Policy" ] = "geolocation=(), microphone=(), camera=()"
return response
Audit Logging
What to Log
Login attempts (success/failure)
Logout events
Token generation
MFA challenges
Password changes
Account lockouts
logger.security(
"login_success" ,
user_id = user.id,
username = user.username,
ip_address = request.client.host,
user_agent = request.headers.get( "user-agent" ),
timestamp = datetime.utcnow()
)
Permission checks (allowed/denied)
Role changes
Permission grants/revokes
Privilege escalation attempts
logger.security(
"permission_denied" ,
user_id = user.id,
action = "executor" ,
resource = "tool:chat" ,
reason = "no_permission" ,
timestamp = datetime.utcnow()
)
Sensitive data access
Data modifications
Export operations
Deletion events
logger.audit(
"data_export" ,
user_id = user.id,
resource_type = "conversations" ,
record_count = 100 ,
timestamp = datetime.utcnow()
)
User creation/deletion
Configuration changes
Permission model updates
System configuration
logger.audit(
"user_created" ,
admin_user_id = admin.id,
new_user_id = new_user.id,
roles = new_user.roles,
timestamp = datetime.utcnow()
)
Secure Logging
import structlog
## Structured logging
logger = structlog.get_logger()
## Mask sensitive data
def mask_sensitive ( data : dict ) -> dict :
masked = data.copy()
for key in [ 'password' , 'token' , 'api_key' , 'secret' ]:
if key in masked:
masked[key] = '***REDACTED***'
return masked
## Log with context
logger.info(
"api_request" ,
** mask_sensitive({
"user_id" : user.id,
"endpoint" : "/message" ,
"params" : request_data
})
)
Compliance
GDPR Compliance
## Right to erasure
async def delete_user_data ( user_id : str ):
"""Permanently delete all user data"""
# Delete from databases
await db.delete_user(user_id)
await db.delete_conversations(user_id)
await db.delete_sessions(user_id)
# Remove from OpenFGA
tuples = await openfga_client.read_tuples( user = f "user: { user_id } " )
await openfga_client.delete_tuples(tuples)
# Anonymize logs
await anonymize_logs(user_id)
logger.audit( "gdpr_data_deletion" , user_id = user_id)
SOC 2 Controls
Access controls (AC)
Change management (CM)
Logical security (LS)
Operations (OP)
Risk management (RM)
Evidence Collection :
## Automated evidence collection
async def collect_compliance_evidence ():
evidence = {
"access_reviews" : await get_access_reviews(),
"security_patches" : await get_patch_status(),
"audit_logs" : await get_audit_summary(),
"backup_tests" : await get_dr_test_results(),
"vulnerability_scans" : await get_scan_results()
}
await store_evidence(evidence)
Vulnerability Management
Dependency Scanning
## .github/workflows/security-scan.yml
name : Security Scan
on : [ push , pull_request ]
jobs :
scan :
runs-on : ubuntu-latest
steps :
- uses : actions/checkout@v3
- name : Run Trivy
uses : aquasecurity/trivy-action@master
with :
scan-type : 'fs'
scan-ref : '.'
severity : 'CRITICAL,HIGH'
- name : Run Bandit
run : |
uv tool install bandit
bandit -r src/ -f json -o bandit-report.json
- name : Run Safety
run : |
uv tool install safety
safety check --json
Container Scanning
## Scan Docker images
trivy image openfga/openfga:latest
trivy image bitnami/keycloak:latest
## Scan for vulnerabilities
grype mcp-server-langgraph:latest
Incident Response
Incident Response Plan
Detection
Monitor security alerts
Review audit logs
Analyze anomalies
Containment
Isolate affected systems
Revoke compromised credentials
Block malicious IPs
Investigation
Collect forensic data
Analyze attack vectors
Identify scope
Eradication
Remove malware
Patch vulnerabilities
Reset credentials
Recovery
Restore from backups
Verify system integrity
Resume operations
Lessons Learned
Document incident
Update procedures
Implement preventions
## security-contacts.yaml
security_team :
email : security@company.com
pagerduty : security-team
slack : #security-incidents
escalation :
level_1 : security-engineer
level_2 : security-manager
level_3 : ciso
Next Steps
Defense-in-Depth : Multi-layered security protects your MCP Server at every level!