Skip to main content

Authentication Migration Guide v2.7.0 → v2.8.0

Current Documentation: This guide is for v2.8.0 (latest release, 2025-10-22).
Breaking Change: Token-based authentication will be required for all tool calls in v2.8.0.

Summary of Changes

What Changed?

Before (v2.7.0 and earlier):
## Tools accepted bare user_id without credentials
result = await call_tool("agent_chat", {
    "message": "Hello!",
    "user_id": "user:alice"  # No authentication
})
After (v2.8.0+):
## 1. First, obtain a token via login
login_response = await client.post("/auth/login", json={
    "username": "alice",
    "password": "alice123"
})
token = login_response.json()["access_token"]

## 2. Include token in all tool calls
result = await call_tool("agent_chat", {
    "message": "Hello!",
    "token": token,  # Required JWT token
    "user_id": "user:alice"
})

Why This Change?

The previous implementation had a critical security vulnerability:
  • Tools accepted bare user_id without verifying credentials
  • AuthMiddleware.authenticate() granted access without password validation
  • Any actor who guessed a username could impersonate that user
Security Impact: High - Complete authentication bypass

Migration Steps

For HTTP/StreamableHTTP Clients

Step 1: Add Login Flow
import httpx

async def get_auth_token(base_url: str, username: str, password: str) -> str:
    """Obtain JWT token from login endpoint"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{base_url}/auth/login",
            json={"username": username, "password": password}
        )
        response.raise_for_status()

        data = response.json()
        return data["access_token"]

## Usage
token = await get_auth_token(
    "http://localhost:8000",
    username="alice",
    password="alice123"
)
Step 2: Update Tool Call Requests
Before:
request = {
    "jsonrpc": "2.0",
    "method": "tools/call",
    "params": {
        "name": "agent_chat",
        "arguments": {
            "message": "Hello!",
            "user_id": "user:alice"  # ❌ Missing token
        }
    }
}
After:
request = {
    "jsonrpc": "2.0",
    "method": "tools/call",
    "params": {
        "name": "agent_chat",
        "arguments": {
            "message": "Hello!",
            "token": token,  # ✅ Required
            "user_id": "user:alice"
        }
    }
}
Step 3: Handle Token Expiration
class MCPAuthClient:
    """HTTP client with automatic token refresh"""

    def __init__(self, base_url: str, username: str, password: str):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = None
        self.token_expiry = None

    async def ensure_token(self):
        """Ensure we have a valid token"""
        import datetime

        now = datetime.datetime.now(datetime.timezone.utc)

        # Get new token if expired or missing
        if not self.token or (self.token_expiry and now >= self.token_expiry):
            await self.login()

    async def login(self):
        """Login and store token"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/auth/login",
                json={"username": self.username, "password": self.password}
            )
            response.raise_for_status()

            data = response.json()
            self.token = data["access_token"]

            # Calculate expiry (subtract 60s buffer)
            expires_in = data.get("expires_in", 3600)
            self.token_expiry = now + datetime.timedelta(seconds=expires_in - 60)

    async def call_tool(self, name: str, arguments: dict):
        """Call tool with automatic token management"""
        await self.ensure_token()

        # Add token to arguments
        arguments_with_token = {**arguments, "token": self.token}

        request = {
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": name,
                "arguments": arguments_with_token
            }
        }

        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/message",
                json=request,
                timeout=30.0
            )
            response.raise_for_status()
            return response.json()

## Usage
client = MCPAuthClient(
    "http://localhost:8000",
    username="alice",
    password="alice123"
)

## Token is automatically managed
result = await client.call_tool("agent_chat", {
    "message": "Hello!",
    "user_id": "user:alice"
})

For stdio Clients

stdio clients cannot use HTTP endpoints, so tokens must be obtained out-of-band:
Option 1: Use HTTP endpoint separately
## 1. Get token via HTTP
import requests
response = requests.post("http://localhost:8000/auth/login", json={
    "username": "alice",
    "password": "alice123"
})
token = response.json()["access_token"]

## 2. Use token with stdio client
async with stdio_client(server_params) as (read, write):
    async with ClientSession(read, write) as session:
        await session.initialize()

        result = await session.call_tool("agent_chat", arguments={
            "message": "Hello!",
            "token": token,
            "user_id": "user:alice"
        })
Option 2: Create token programmatically (dev/test only)
from mcp_server_langgraph.auth.factory import create_auth_middleware
from mcp_server_langgraph.core.config import settings

## Create auth middleware
auth = create_auth_middleware(settings, openfga_client=None)

## Create token programmatically
token = auth.create_token("alice", expires_in=3600)

## Use token with stdio client
result = await session.call_tool("agent_chat", arguments={
    "message": "Hello!",
    "token": token,
    "user_id": "user:alice"
})

Updated Tool Schemas

agent_chat

{
  "message": string,        // Required: User message
  "token": string,          // ✅ NEW: Required JWT token
  "user_id": string,        // Required: User identifier
  "thread_id"?: string,     // Optional: Thread ID
  "response_format"?: "concise" | "detailed"  // Optional: Response format
}

conversation_get

{
  "thread_id": string,      // Required: Thread identifier
  "token": string,          // ✅ NEW: Required JWT token
  "user_id": string         // Required: User identifier
}
{
  "query": string,          // Required: Search query
  "token": string,          // ✅ NEW: Required JWT token
  "user_id": string,        // Required: User identifier
  "limit"?: number          // Optional: Max results (default: 10)
}

Default Credentials (Development Only)

For development and testing, the InMemoryUserProvider has these default users:
UsernamePasswordRolesNotes
alicealice123user, premiumStandard premium user
bobbob123userStandard user
adminadmin123adminAdministrator
⚠️ WARNING: These credentials use plaintext password storage and are INSECURE. They are only for development/testing. For Production:
  • Use KeycloakUserProvider with proper SSO/OIDC
  • Or implement password hashing in InMemoryUserProvider

Production Deployment

  1. Configure Keycloak:
    # Set environment variables
    export AUTH_PROVIDER=keycloak
    export KEYCLOAK_SERVER_URL=https://auth.example.com
    export KEYCLOAK_REALM=production
    export KEYCLOAK_CLIENT_ID=mcp-server
    export KEYCLOAK_CLIENT_SECRET=<secret>
    
  2. Obtain Tokens from Keycloak:
    # Clients obtain tokens from Keycloak directly
    # Tokens are validated by the MCP server
    
    # The server will verify Keycloak JWT tokens automatically
    
  3. No code changes needed - the auth factory automatically uses KeycloakUserProvider

Custom Authentication Provider

Implement the UserProvider interface:
from mcp_server_langgraph.auth.user_provider import UserProvider, AuthResponse

class CustomUserProvider(UserProvider):
    async def authenticate(self, username: str, password: str) -> AuthResponse:
        # Your custom authentication logic
        # - Validate credentials against your database
        # - Return AuthResponse with user info
        pass

    async def verify_token(self, token: str) -> TokenVerification:
        # Your custom token verification
        # - Validate JWT signature
        # - Check expiration
        # - Return TokenVerification
        pass

    # Implement other required methods...
Register your provider in auth/factory.py:
def create_user_provider(settings, openfga_client):
    provider_type = settings.auth_provider.lower()

    if provider_type == "custom":
        return CustomUserProvider(config=settings.custom_config)
    # ... existing providers

Testing Migration

Update Unit Tests

@pytest.fixture
async def auth_token():
    """Get valid auth token for tests"""
    from mcp_server_langgraph.auth.factory import create_auth_middleware
    from mcp_server_langgraph.core.config import settings

    auth = create_auth_middleware(settings, openfga_client=None)
    return auth.create_token("alice", expires_in=3600)

def test_tool_call_with_token(auth_token):
    """Test tool call with valid token"""
    response = await call_tool("agent_chat", {
        "message": "Hello!",
        "token": auth_token,
        "user_id": "user:alice"
    })
    assert response.status_code == 200

Integration Tests

class TestAuthMigration:
    def test_login_endpoint(self):
        """Test login endpoint returns token"""
        response = client.post("/auth/login", json={
            "username": "alice",
            "password": "alice123"
        })

        assert response.status_code == 200
        assert "access_token" in response.json()
        assert "token_type" in response.json()
        assert response.json()["token_type"] == "bearer"

    def test_tool_call_without_token_fails(self):
        """Test tool call without token is rejected"""
        response = client.post("/message", json={
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": "agent_chat",
                "arguments": {
                    "message": "Hello!",
                    "user_id": "user:alice"
                    # Missing token
                }
            }
        })

        assert response.status_code in [200, 401, 403]
        data = response.json()
        assert "error" in data or response.status_code != 200

    def test_tool_call_with_valid_token_succeeds(self, auth_token):
        """Test tool call with valid token succeeds"""
        response = client.post("/message", json={
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": "agent_chat",
                "arguments": {
                    "message": "Hello!",
                    "token": auth_token,
                    "user_id": "user:alice"
                }
            }
        })

        assert response.status_code == 200
        assert "result" in response.json()

Troubleshooting

Error: “Authentication token required”

Cause: Tool call missing token field Fix: Add token parameter to tool call arguments
## ❌ Wrong
arguments = {"message": "Hello!", "user_id": "alice"}

## ✅ Correct
arguments = {"message": "Hello!", "token": token, "user_id": "alice"}

Error: “Invalid authentication token”

Causes:
  1. Token expired
  2. Token signed with wrong secret
  3. Malformed token
Fix:
  1. Re-login to get fresh token
  2. Ensure JWT_SECRET_KEY matches between token creation and verification
  3. Check token format (should be eyJ...)

Error: “Invalid token: missing user identifier”

Cause: Token payload missing sub claim Fix: Ensure token was created correctly:
## Token must include 'sub' claim
payload = {
    "sub": "user:alice",  # Required
    "username": "alice",
    "email": "alice@example.com",
    "roles": ["user"],
    "exp": expiration_time,
    "iat": issued_at_time
}

Error: “Password required for InMemoryUserProvider”

Cause: Trying to authenticate without password Fix: InMemoryUserProvider now requires passwords:
## ❌ Wrong
auth_result = await auth.authenticate("alice")  # No password

## ✅ Correct
auth_result = await auth.authenticate("alice", "alice123")

Rollback Plan

If you need to temporarily rollback to the old behavior (NOT RECOMMENDED):
  1. Checkout previous version:
    git checkout v2.7.0
    
  2. Or apply this patch (INSECURE - development only):
    # In server_stdio.py and server_streamable.py
    # Make token optional (TEMPORARY ONLY)
    
    # Extract token (allow None)
    token = arguments.get("token")
    
    if token:
        # Verify token if provided
        token_verification = await self.auth.verify_token(token)
        if not token_verification.valid:
            raise PermissionError("Invalid token")
        user_id = token_verification.payload["sub"]
    else:
        # Fall back to user_id (INSECURE)
        user_id = arguments.get("user_id")
        if not user_id:
            raise PermissionError("Either token or user_id required")
    
⚠️ This rollback is INSECURE and should only be used temporarily during migration.

Support

Need help with migration?

Changelog

v2.8.0 (Current)

Breaking Changes:
  • ✅ JWT token required for all tool calls
  • /auth/login endpoint added for token generation
  • ✅ InMemoryUserProvider requires passwords
  • ✅ Auth factory respects settings.auth_provider configuration
Security Fixes:
  • 🔒 Closed authentication bypass vulnerability
  • 🔒 Password validation in InMemoryUserProvider
  • 🔒 Token verification enforced for all operations
  • 🔒 Observability initialization in streamable server
Migration:
  • 📖 All clients must update to include token in tool calls
  • 📖 See examples in /examples directory
  • 📖 See this migration guide for complete details

v2.7.0 (Previous)

  • VULNERABLE: Bare user_id authentication without credentials
  • INSECURE: InMemoryUserProvider grants access without password
  • ❌ Hard-coded auth provider (always InMemory)
  • ❌ Missing observability initialization in streamable server