Authentication Migration Guide v2.7.0 → v2.8.0
Current Documentation: This guide is for v2.8.0 (latest release, 2025-10-22).
Breaking Change: Token-based authentication will be required for all tool calls in v2.8.0.
Summary of Changes
What Changed?
Before (v2.7.0 and earlier):
## Tools accepted bare user_id without credentials
result = await call_tool("agent_chat", {
"message": "Hello!",
"user_id": "user:alice" # ❌ No authentication
})
After (v2.8.0+):
## 1. First, obtain a token via login
login_response = await client.post("/auth/login", json={
"username": "alice",
"password": "alice123"
})
token = login_response.json()["access_token"]
## 2. Include token in all tool calls
result = await call_tool("agent_chat", {
"message": "Hello!",
"token": token, # ✅ Required JWT token
"user_id": "user:alice"
})
Why This Change?
The previous implementation had a critical security vulnerability:
- Tools accepted bare
user_id without verifying credentials
AuthMiddleware.authenticate() granted access without password validation
- Any actor who guessed a username could impersonate that user
Security Impact: High - Complete authentication bypass
Migration Steps
For HTTP/StreamableHTTP Clients
Step 1: Add Login Flow
import httpx
async def get_auth_token(base_url: str, username: str, password: str) -> str:
"""Obtain JWT token from login endpoint"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{base_url}/auth/login",
json={"username": username, "password": password}
)
response.raise_for_status()
data = response.json()
return data["access_token"]
## Usage
token = await get_auth_token(
"http://localhost:8000",
username="alice",
password="alice123"
)
Step 2: Update Tool Call Requests
Before:
request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "agent_chat",
"arguments": {
"message": "Hello!",
"user_id": "user:alice" # ❌ Missing token
}
}
}
After:
request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "agent_chat",
"arguments": {
"message": "Hello!",
"token": token, # ✅ Required
"user_id": "user:alice"
}
}
}
Step 3: Handle Token Expiration
class MCPAuthClient:
"""HTTP client with automatic token refresh"""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url
self.username = username
self.password = password
self.token = None
self.token_expiry = None
async def ensure_token(self):
"""Ensure we have a valid token"""
import datetime
now = datetime.datetime.now(datetime.timezone.utc)
# Get new token if expired or missing
if not self.token or (self.token_expiry and now >= self.token_expiry):
await self.login()
async def login(self):
"""Login and store token"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/auth/login",
json={"username": self.username, "password": self.password}
)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
# Calculate expiry (subtract 60s buffer)
expires_in = data.get("expires_in", 3600)
self.token_expiry = now + datetime.timedelta(seconds=expires_in - 60)
async def call_tool(self, name: str, arguments: dict):
"""Call tool with automatic token management"""
await self.ensure_token()
# Add token to arguments
arguments_with_token = {**arguments, "token": self.token}
request = {
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": name,
"arguments": arguments_with_token
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/message",
json=request,
timeout=30.0
)
response.raise_for_status()
return response.json()
## Usage
client = MCPAuthClient(
"http://localhost:8000",
username="alice",
password="alice123"
)
## Token is automatically managed
result = await client.call_tool("agent_chat", {
"message": "Hello!",
"user_id": "user:alice"
})
For stdio Clients
stdio clients cannot use HTTP endpoints, so tokens must be obtained out-of-band:
Option 1: Use HTTP endpoint separately
## 1. Get token via HTTP
import requests
response = requests.post("http://localhost:8000/auth/login", json={
"username": "alice",
"password": "alice123"
})
token = response.json()["access_token"]
## 2. Use token with stdio client
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool("agent_chat", arguments={
"message": "Hello!",
"token": token,
"user_id": "user:alice"
})
Option 2: Create token programmatically (dev/test only)
from mcp_server_langgraph.auth.factory import create_auth_middleware
from mcp_server_langgraph.core.config import settings
## Create auth middleware
auth = create_auth_middleware(settings, openfga_client=None)
## Create token programmatically
token = auth.create_token("alice", expires_in=3600)
## Use token with stdio client
result = await session.call_tool("agent_chat", arguments={
"message": "Hello!",
"token": token,
"user_id": "user:alice"
})
agent_chat
{
"message": string, // Required: User message
"token": string, // ✅ NEW: Required JWT token
"user_id": string, // Required: User identifier
"thread_id"?: string, // Optional: Thread ID
"response_format"?: "concise" | "detailed" // Optional: Response format
}
conversation_get
{
"thread_id": string, // Required: Thread identifier
"token": string, // ✅ NEW: Required JWT token
"user_id": string // Required: User identifier
}
conversation_search
{
"query": string, // Required: Search query
"token": string, // ✅ NEW: Required JWT token
"user_id": string, // Required: User identifier
"limit"?: number // Optional: Max results (default: 10)
}
Default Credentials (Development Only)
For development and testing, the InMemoryUserProvider has these default users:
| Username | Password | Roles | Notes |
|---|
alice | alice123 | user, premium | Standard premium user |
bob | bob123 | user | Standard user |
admin | admin123 | admin | Administrator |
⚠️ WARNING: These credentials use plaintext password storage and are INSECURE. They are only for development/testing.
For Production:
- Use
KeycloakUserProvider with proper SSO/OIDC
- Or implement password hashing in
InMemoryUserProvider
Production Deployment
Using Keycloak (Recommended)
- Configure Keycloak:
# Set environment variables
export AUTH_PROVIDER=keycloak
export KEYCLOAK_SERVER_URL=https://auth.example.com
export KEYCLOAK_REALM=production
export KEYCLOAK_CLIENT_ID=mcp-server
export KEYCLOAK_CLIENT_SECRET=<secret>
- Obtain Tokens from Keycloak:
# Clients obtain tokens from Keycloak directly
# Tokens are validated by the MCP server
# The server will verify Keycloak JWT tokens automatically
- No code changes needed - the auth factory automatically uses KeycloakUserProvider
Custom Authentication Provider
Implement the UserProvider interface:
from mcp_server_langgraph.auth.user_provider import UserProvider, AuthResponse
class CustomUserProvider(UserProvider):
async def authenticate(self, username: str, password: str) -> AuthResponse:
# Your custom authentication logic
# - Validate credentials against your database
# - Return AuthResponse with user info
pass
async def verify_token(self, token: str) -> TokenVerification:
# Your custom token verification
# - Validate JWT signature
# - Check expiration
# - Return TokenVerification
pass
# Implement other required methods...
Register your provider in auth/factory.py:
def create_user_provider(settings, openfga_client):
provider_type = settings.auth_provider.lower()
if provider_type == "custom":
return CustomUserProvider(config=settings.custom_config)
# ... existing providers
Testing Migration
Update Unit Tests
@pytest.fixture
async def auth_token():
"""Get valid auth token for tests"""
from mcp_server_langgraph.auth.factory import create_auth_middleware
from mcp_server_langgraph.core.config import settings
auth = create_auth_middleware(settings, openfga_client=None)
return auth.create_token("alice", expires_in=3600)
def test_tool_call_with_token(auth_token):
"""Test tool call with valid token"""
response = await call_tool("agent_chat", {
"message": "Hello!",
"token": auth_token,
"user_id": "user:alice"
})
assert response.status_code == 200
Integration Tests
class TestAuthMigration:
def test_login_endpoint(self):
"""Test login endpoint returns token"""
response = client.post("/auth/login", json={
"username": "alice",
"password": "alice123"
})
assert response.status_code == 200
assert "access_token" in response.json()
assert "token_type" in response.json()
assert response.json()["token_type"] == "bearer"
def test_tool_call_without_token_fails(self):
"""Test tool call without token is rejected"""
response = client.post("/message", json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "agent_chat",
"arguments": {
"message": "Hello!",
"user_id": "user:alice"
# Missing token
}
}
})
assert response.status_code in [200, 401, 403]
data = response.json()
assert "error" in data or response.status_code != 200
def test_tool_call_with_valid_token_succeeds(self, auth_token):
"""Test tool call with valid token succeeds"""
response = client.post("/message", json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "agent_chat",
"arguments": {
"message": "Hello!",
"token": auth_token,
"user_id": "user:alice"
}
}
})
assert response.status_code == 200
assert "result" in response.json()
Troubleshooting
Error: “Authentication token required”
Cause: Tool call missing token field
Fix: Add token parameter to tool call arguments
## ❌ Wrong
arguments = {"message": "Hello!", "user_id": "alice"}
## ✅ Correct
arguments = {"message": "Hello!", "token": token, "user_id": "alice"}
Error: “Invalid authentication token”
Causes:
- Token expired
- Token signed with wrong secret
- Malformed token
Fix:
- Re-login to get fresh token
- Ensure
JWT_SECRET_KEY matches between token creation and verification
- Check token format (should be
eyJ...)
Error: “Invalid token: missing user identifier”
Cause: Token payload missing sub claim
Fix: Ensure token was created correctly:
## Token must include 'sub' claim
payload = {
"sub": "user:alice", # ✅ Required
"username": "alice",
"email": "alice@example.com",
"roles": ["user"],
"exp": expiration_time,
"iat": issued_at_time
}
Error: “Password required for InMemoryUserProvider”
Cause: Trying to authenticate without password
Fix: InMemoryUserProvider now requires passwords:
## ❌ Wrong
auth_result = await auth.authenticate("alice") # No password
## ✅ Correct
auth_result = await auth.authenticate("alice", "alice123")
Rollback Plan
If you need to temporarily rollback to the old behavior (NOT RECOMMENDED):
- Checkout previous version:
- Or apply this patch (INSECURE - development only):
# In server_stdio.py and server_streamable.py
# Make token optional (TEMPORARY ONLY)
# Extract token (allow None)
token = arguments.get("token")
if token:
# Verify token if provided
token_verification = await self.auth.verify_token(token)
if not token_verification.valid:
raise PermissionError("Invalid token")
user_id = token_verification.payload["sub"]
else:
# Fall back to user_id (INSECURE)
user_id = arguments.get("user_id")
if not user_id:
raise PermissionError("Either token or user_id required")
⚠️ This rollback is INSECURE and should only be used temporarily during migration.
Support
Need help with migration?
Changelog
v2.8.0 (Current)
Breaking Changes:
- ✅ JWT token required for all tool calls
- ✅
/auth/login endpoint added for token generation
- ✅ InMemoryUserProvider requires passwords
- ✅ Auth factory respects
settings.auth_provider configuration
Security Fixes:
- 🔒 Closed authentication bypass vulnerability
- 🔒 Password validation in InMemoryUserProvider
- 🔒 Token verification enforced for all operations
- 🔒 Observability initialization in streamable server
Migration:
- 📖 All clients must update to include
token in tool calls
- 📖 See examples in
/examples directory
- 📖 See this migration guide for complete details
v2.7.0 (Previous)
- ❌ VULNERABLE: Bare
user_id authentication without credentials
- ❌ INSECURE: InMemoryUserProvider grants access without password
- ❌ Hard-coded auth provider (always InMemory)
- ❌ Missing observability initialization in streamable server