Skip to main content

Overview

Implement these security best practices to harden your MCP Server deployment for production. This guide covers configuration, deployment, and operational security.
Security is an ongoing process, not a one-time setup. Regularly review and update your security posture.

Authentication Best Practices

Use Strong Authentication

Always require multi-factor authentication in production:
# Keycloak configuration
keycloak:
  realm: langgraph
  mfa:
    required: true
    allowed_methods:
      - totp  # Authenticator app
      - webauthn  # Hardware keys
    grace_period: 0  # Enforce immediately
Verify MFA in application:
@app.post("/message")
async def send_message(user: User = Depends(get_current_user)):
    if not user.mfa_verified:
        raise HTTPException(
            status_code=403,
            detail="MFA required for this operation"
        )
    # Process request
Configure password requirements:
# Keycloak password policy
password_policy:
  minimum_length: 12
  require_uppercase: true
  require_lowercase: true
  require_numbers: true
  require_special_chars: true
  password_history: 10
  max_age_days: 90
  min_age_days: 1
  not_username: true
  not_email: true
Password strength validation:
import re
from zxcvbn import zxcvbn

def validate_password(password: str, user_inputs: list) -> bool:
    # Check length
    if len(password) < 12:
        raise ValueError("Password must be at least 12 characters")

    # Check complexity
    if not re.search(r"[A-Z]", password):
        raise ValueError("Password must contain uppercase letter")
    if not re.search(r"[a-z]", password):
        raise ValueError("Password must contain lowercase letter")
    if not re.search(r"[0-9]", password):
        raise ValueError("Password must contain number")
    if not re.search(r"[!@#$%^&*]", password):
        raise ValueError("Password must contain special character")

    # Check strength
    result = zxcvbn(password, user_inputs=user_inputs)
    if result['score'] < 3:
        raise ValueError("Password is too weak")

    return True
Configure secure JWT tokens:
# Short-lived access tokens
ACCESS_TOKEN_EXPIRE_MINUTES = 60  # 1 hour

# Longer refresh tokens
REFRESH_TOKEN_EXPIRE_DAYS = 7

# Use RS256 (asymmetric)
ALGORITHM = "RS256"

# Rotate signing keys monthly
KEY_ROTATION_DAYS = 30

# JWT configuration
jwt_config = {
    "algorithm": "RS256",
    "access_token_expire": 3600,
    "refresh_token_expire": 604800,
    "issuer": "https://keycloak.yourdomain.com",
    "audience": "mcp-server-langgraph",
    "verify_signature": True,
    "verify_exp": True,
    "verify_aud": True,
    "verify_iss": True
}
Secure session configuration:
# Session settings
SESSION_CONFIG = {
    "ttl": 3600,  # 1 hour
    "sliding_window": True,  # Extend on activity
    "max_concurrent_sessions": 3,  # Limit per user
    "secure": True,  # HTTPS only
    "httponly": True,  # No JS access
    "samesite": "strict",  # CSRF protection
    "session_fixation_protection": True,
    "regenerate_on_login": True
}

# Implement session timeout
async def check_session_activity(session_id: str):
    session = await redis.get(f"session:{session_id}")
    if not session:
        raise SessionExpired()

    last_activity = session.get("last_activity")
    if datetime.utcnow() - last_activity > timedelta(hours=1):
        await redis.delete(f"session:{session_id}")
        raise SessionExpired()

Authorization Best Practices

Principle of Least Privilege

## Default deny, explicit allow
DEFAULT_PERMISSIONS = {
    "user": ["viewer"],  # Minimal by default
    "admin": ["admin", "member", "viewer"]
}

## Grant minimum required permissions
async def onboard_user(user_id: str, role: str = "user"):
    """Add user with minimal permissions"""

    # Default viewer access only
    await openfga_client.write_tuples([{
        "user": f"user:{user_id}",
        "relation": "viewer",
        "object": "organization:default"
    }])

    # Require explicit grant for elevated access
    if role == "admin":
        # Requires admin approval
        await request_admin_approval(user_id)

Regular Permission Audits

#!/usr/bin/env python3
## audit-permissions.py

async def audit_permissions():
    """Monthly permission audit"""

    # Find users with admin access
    admins = await openfga_client.list_users(
        relation="admin",
        object="organization:default"
    )

    # Check if still needed
    for admin in admins:
        # Get user info
        user = await get_user(admin)

        # Check last activity
        if user.last_login < datetime.utcnow() - timedelta(days=90):
            logger.warning(f"Inactive admin: {admin}")
            await notify_security_team(f"Remove admin access for {admin}?")

    # Find overly permissive tuples
    all_tuples = await openfga_client.read_tuples()
    for tuple in all_tuples:
        if tuple['relation'] == 'owner' and tuple['object'].startswith('organization:'):
            # Multiple org owners is risky
            owners = await openfga_client.list_users(
                relation="owner",
                object=tuple['object']
            )
            if len(owners) > 3:
                logger.warning(f"Too many owners for {tuple['object']}: {owners}")

Resource Isolation

## Enforce organization boundaries
async def check_resource_access(
    user_id: str,
    resource_id: str,
    action: str
):
    """Ensure users can only access resources in their org"""

    # Get user's organizations
    user_orgs = await openfga_client.list_objects(
        user=f"user:{user_id}",
        relation="member",
        object_type="organization"
    )

    # Get resource's organization
    resource_org = await get_resource_organization(resource_id)

    # Check org membership
    if resource_org not in user_orgs:
        logger.security(
            "cross_org_access_attempt",
            user_id=user_id,
            resource_id=resource_id,
            user_orgs=user_orgs,
            resource_org=resource_org
        )
        raise PermissionError("Cross-organization access denied")

    # Check specific permission
    allowed = await openfga_client.check_permission(
        user=f"user:{user_id}",
        relation=action,
        object=resource_id
    )

    if not allowed:
        raise PermissionError(f"Action '{action}' not allowed")

    return True

Data Security Best Practices

Encryption

  • At Rest
  • In Transit
  • Application-Level
PostgreSQL Encryption:
-- Enable transparent data encryption
ALTER SYSTEM SET ssl = on;
ALTER SYSTEM SET ssl_ciphers = 'HIGH:!aNULL:!MD5';
ALTER SYSTEM SET ssl_prefer_server_ciphers = on;
ALTER SYSTEM SET ssl_min_protocol_version = 'TLSv1.3';

-- Column-level encryption for sensitive data
CREATE EXTENSION IF NOT EXISTS pgcrypto;

CREATE TABLE users (
    id UUID PRIMARY KEY,
    email TEXT,
    email_encrypted BYTEA DEFAULT pgp_sym_encrypt(email, current_setting('app.encryption_key'))
);
Redis Encryption:
# redis.conf
tls-port 6380
tls-cert-file /etc/redis/certs/redis.crt
tls-key-file /etc/redis/certs/redis.key
tls-ca-cert-file /etc/redis/certs/ca.crt
tls-auth-clients yes
Kubernetes PVC Encryption:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-data
spec:
  storageClassName: encrypted-gp3
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi

Data Minimization

## Only collect necessary data
class MessageRequest(BaseModel):
    query: str  # Required
    # Don't collect: IP, user agent, etc. unless needed

## Automatic PII removal
def sanitize_logs(log_data: dict) -> dict:
    """Remove PII from logs"""
    pii_fields = ['email', 'phone', 'ssn', 'credit_card']

    sanitized = log_data.copy()
    for field in pii_fields:
        if field in sanitized:
            sanitized[field] = '***REDACTED***'

    return sanitized

## Data retention policies
async def enforce_retention_policy():
    """Delete old data according to policy"""

    # Delete conversations older than 90 days
    cutoff = datetime.utcnow() - timedelta(days=90)
    await db.delete_conversations(older_than=cutoff)

    # Anonymize audit logs older than 1 year
    log_cutoff = datetime.utcnow() - timedelta(days=365)
    await db.anonymize_audit_logs(older_than=log_cutoff)

Secure Data Deletion

async def secure_delete_user(user_id: str):
    """GDPR-compliant user deletion"""

    # 1. Delete user data
    await db.execute("DELETE FROM users WHERE id = %s", (user_id,))
    await db.execute("DELETE FROM conversations WHERE user_id = %s", (user_id,))
    await db.execute("DELETE FROM sessions WHERE user_id = %s", (user_id,))

    # 2. Remove from authorization system
    tuples = await openfga_client.read_tuples(user=f"user:{user_id}")
    await openfga_client.delete_tuples(tuples)

    # 3. Anonymize audit logs (keep for compliance)
    await db.execute(
        "UPDATE audit_logs SET user_id = 'deleted_user' WHERE user_id = %s",
        (user_id,)
    )

    # 4. Clear caches
    await redis.delete(f"user:{user_id}:*")

    # 5. Log deletion
    logger.audit("user_deleted", user_id=user_id, timestamp=datetime.utcnow())

    # 6. Overwrite sensitive data (defense in depth)
    # PostgreSQL VACUUM FULL reclaims space
    await db.execute("VACUUM FULL users")

Network Security Best Practices

Network Policies

## Deny all by default
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-all
  namespace: mcp-server-langgraph
spec:
  podSelector: {}
  policyTypes:
  - Ingress
  - Egress

---
## Allow only necessary traffic
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: mcp-server-langgraph-policy
  namespace: mcp-server-langgraph
spec:
  podSelector:
    matchLabels:
      app: mcp-server-langgraph
  policyTypes:
  - Ingress
  - Egress

  ingress:
  # Allow from ingress controller
  - from:
    - namespaceSelector:
        matchLabels:
          name: ingress-nginx
    ports:
    - protocol: TCP
      port: 8000

  egress:
  # Allow to PostgreSQL
  - to:
    - podSelector:
        matchLabels:
          app: postgres
    ports:
    - protocol: TCP
      port: 5432

  # Allow to Redis
  - to:
    - podSelector:
        matchLabels:
          app: redis
    ports:
    - protocol: TCP
      port: 6379

  # Allow to Keycloak
  - to:
    - podSelector:
        matchLabels:
          app: keycloak
    ports:
    - protocol: TCP
      port: 8080

  # Allow DNS
  - to:
    - namespaceSelector:
        matchLabels:
          name: kube-system
    - podSelector:
        matchLabels:
          k8s-app: kube-dns
    ports:
    - protocol: UDP
      port: 53

  # Allow HTTPS to external services (LLM APIs)
  - to:
    - namespaceSelector: {}
    ports:
    - protocol: TCP
      port: 443

Rate Limiting

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

## Initialize limiter
limiter = Limiter(
    key_func=get_remote_address,
    default_limits=["100/minute"],
    storage_uri="redis://redis:6379"
)

app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

## Per-endpoint limits
@app.post("/message")
@limiter.limit("30/minute")  # More restrictive for expensive operations
async def send_message(request: Request):
    pass

@app.post("/auth/login")
@limiter.limit("5/minute")  # Prevent brute force
async def login(request: Request):
    pass

## Per-user limits
@app.post("/message")
@limiter.limit("100/minute", key_func=lambda: get_current_user().id)
async def send_message(user: User = Depends(get_current_user)):
    pass

DDoS Protection

## CloudFlare / AWS WAF rules
waf_rules:
  - name: rate_limit_api
    priority: 1
    rule:
      rate_limit:
        limit: 2000
        window: 300  # 5 minutes
        action: block

  - name: geo_blocking
    priority: 2
    rule:
      geo_match:
        block_countries:
          - XX  # Known malicious sources
          - YY

  - name: ip_reputation
    priority: 3
    rule:
      ip_reputation:
        block_high_risk: true
        block_anonymous_proxies: true
        block_tor_exit_nodes: true

  - name: sql_injection_protection
    priority: 4
    rule:
      sql_injection:
        sensitivity: high

Container Security Best Practices

Secure Base Images

## Use minimal, verified base images
FROM python:3.12-slim-bookworm AS base

## Run as non-root
RUN groupadd -r appuser && useradd -r -g appuser appuser

## Install security updates
RUN apt-get update && \
    apt-get upgrade -y && \
    apt-get install -y --no-install-recommends \
        ca-certificates \
        && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

## Copy application
WORKDIR /app
COPY --chown=appuser:appuser . .

## Install dependencies
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
RUN uv sync --frozen

## Switch to non-root user
USER appuser

## Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')"

## Run application
CMD ["python", "-m", "uvicorn", "mcp_server_langgraph.main:app", "--host", "0.0.0.0", "--port", "8000"]

Security Context

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-server-langgraph
spec:
  template:
    spec:
      # Pod security
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
        seccompProfile:
          type: RuntimeDefault

      containers:
      - name: agent
        image: mcp-server-langgraph:latest

        # Container security
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          runAsNonRoot: true
          runAsUser: 1000
          capabilities:
            drop:
            - ALL

        # Resource limits (prevent DoS)
        resources:
          requests:
            cpu: 1000m
            memory: 1Gi
          limits:
            cpu: 4000m
            memory: 4Gi

        # Read-only filesystem with specific writable paths
        volumeMounts:
        - name: tmp
          mountPath: /tmp
        - name: cache
          mountPath: /.cache

      volumes:
      - name: tmp
        emptyDir: {}
      - name: cache
        emptyDir: {}

Image Scanning

## .github/workflows/security.yml
name: Security Scan
on: [push, pull_request]

jobs:
  scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3

    - name: Build image
      run: docker build -t mcp-server-langgraph:${{ github.sha }} .

    - name: Run Trivy scan
      uses: aquasecurity/trivy-action@master
      with:
        image-ref: mcp-server-langgraph:${{ github.sha }}
        format: 'sarif'
        output: 'trivy-results.sarif'
        severity: 'CRITICAL,HIGH'

    - name: Upload results
      uses: github/codeql-action/upload-sarif@v2
      with:
        sarif_file: 'trivy-results.sarif'

    - name: Fail on critical vulnerabilities
      uses: aquasecurity/trivy-action@master
      with:
        image-ref: mcp-server-langgraph:${{ github.sha }}
        exit-code: '1'
        severity: 'CRITICAL'

Secret Management Best Practices

Never Commit Secrets

## .gitignore
.env
.env.*
*.pem
*.key
credentials.json
secrets.yaml
config/secrets/

## Use git-secrets to prevent accidental commits
git secrets --install
git secrets --register-aws
git secrets --add 'sk-[a-zA-Z0-9]{48}'  # OpenAI keys
git secrets --add 'xoxb-[0-9]{12}-[0-9]{12}-[a-zA-Z0-9]{24}'  # Slack tokens

Use Secret Managers

## Use Infisical, not environment variables
from mcp_server_langgraph.core.config import settings

## BAD - hardcoded
API_KEY = "sk-1234567890abcdef"

## BAD - environment variable (better but not ideal)
API_KEY = os.getenv("API_KEY")

## GOOD - secret manager
API_KEY = settings.anthropic_api_key  # Loaded from Infisical

Rotate Secrets Regularly

## Automated secret rotation
async def rotate_jwt_secret():
    """Rotate JWT signing key monthly"""

    # Generate new key
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=4096
    )

    # Store in Infisical
    await infisical_client.create_secret(
        secret_name=f"JWT_PRIVATE_KEY_{datetime.now().strftime('%Y%m')}",
        secret_value=private_key.private_bytes(...).decode(),
        project_id=settings.infisical_project_id,
        environment="production"
    )

    # Update application config
    await update_config("JWT_KEY_ID", f"key-{datetime.now().strftime('%Y%m')}")

    # Keep old key for 24h for token validation
    await schedule_key_deletion(old_key_id, delay_hours=24)

## Schedule rotation
schedule.every().month.do(rotate_jwt_secret)

Monitoring & Alerting

Security Metrics

## Failed authentication attempts
rate(auth_failed_attempts_total[5m]) > 10

## Permission denials
rate(authorization_denied_total[5m]) > 50

## Suspicious activity
sum(rate(http_requests_total{status=~"4..|5.."}[5m])) > 100

## Token validation failures
rate(jwt_validation_failed_total[1m]) > 5

Security Alerts

## prometheus-alerts.yaml
groups:
- name: security_alerts
  rules:
  - alert: BruteForceAttack
    expr: rate(auth_failed_attempts_total[5m]) > 10
    for: 2m
    annotations:
      summary: "Potential brute force attack detected"
      description: "{{ $value }} failed login attempts per second"

  - alert: UnauthorizedAccessAttempts
    expr: rate(authorization_denied_total[5m]) > 50
    for: 5m
    annotations:
      summary: "High rate of authorization failures"

  - alert: SuspiciousAPIUsage
    expr: sum(rate(http_requests_total{status="403"}[5m])) > 20
    for: 5m
    annotations:
      summary: "Unusual number of forbidden requests"

  - alert: TokenValidationFailures
    expr: rate(jwt_validation_failed_total[1m]) > 5
    for: 1m
    annotations:
      summary: "Token validation failures detected"

Compliance Checklist

  • Data processing agreement
  • Privacy policy published
  • Consent management
  • Right to access implemented
  • Right to erasure implemented
  • Data portability
  • Breach notification procedure
  • Data Protection Officer assigned
  • DPIA completed
  • Access controls documented
  • Change management process
  • Incident response plan
  • Business continuity plan
  • Vendor management
  • Risk assessment
  • Security awareness training
  • Audit logging enabled
  • Penetration testing
  • BAA with vendors
  • Encryption at rest
  • Encryption in transit
  • Access controls
  • Audit trails
  • Breach notification
  • Risk analysis
  • Security officer assigned
  • Cardholder data isolated
  • Strong cryptography
  • Secure network architecture
  • Access control measures
  • Network monitoring
  • Vulnerability management
  • Security testing
  • Information security policy

Security Testing

Penetration Testing

## OWASP ZAP automated scan
docker run -t owasp/zap2docker-stable zap-baseline.py \
  -t https://api.yourdomain.com \
  -r zap-report.html

## Manual testing checklist
## 1. Authentication bypass
## 2. Authorization flaws
## 3. SQL injection
## 4. XSS
## 5. CSRF
## 6. API abuse
## 7. Session management
## 8. Cryptography issues

Security Code Review

## .github/workflows/security-review.yml
name: Security Code Review
on: [pull_request]

jobs:
  security:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3

    - name: Bandit security scan
      run: |
        uv tool install bandit
        bandit -r src/ -f json -o bandit-report.json

    - name: Safety dependency check
      run: |
        uv tool install safety
        safety check --json --output safety-report.json

    - name: Semgrep scan
      run: |
        uv tool install semgrep
        semgrep --config=auto --json -o semgrep-report.json

Next Steps


Security First: Follow these best practices to protect your MCP Server and user data!