Documentation Index
Fetch the complete documentation index at: https://mcp-server-langgraph.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
19. Async-First Architecture
Date: 2025-10-13
Status
Accepted
Category
Core Architecture
Context
Modern AI agents require numerous I/O-bound operations:
- LLM API calls: 1-30 seconds per request (network latency)
- Database queries: Redis sessions, OpenFGA authorization checks
- External APIs: Keycloak authentication, Infisical secrets retrieval
- Concurrent requests: Multiple users, parallel tool executions
Traditional synchronous Python blocks the event loop on I/O:
# Synchronous (BLOCKS entire thread)
response = requests.get("https://api.llm.com/chat") # 5 seconds
session = redis_client.get("session:123") # 20ms
auth_check = openfga.check(user, resource) # 50ms
# Total: 5.07 seconds to handle 1 request
# Throughput: ~0.2 requests/second per worker
This creates severe bottlenecks:
- Low throughput: Each worker handles only 1 request at a time
- Poor scalability: Need 100 workers to handle 20 concurrent users
- Resource waste: Workers idle during I/O waits
- Timeout risk: Long chains of I/O operations easily exceed timeouts
Decision
We will adopt an async-first architecture using Python’s asyncio throughout the codebase.
Core Principle
All I/O operations MUST be async. Pure CPU work MAY be sync.
# ✅ Async I/O operations
async def call_llm(prompt: str) -> str:
async with httpx.AsyncClient() as client:
response = await client.post("https://api.llm.com/chat", json={"prompt": prompt})
return response.json()
async def get_session(session_id: str) -> SessionData:
return await redis.get(f"session:{session_id}")
async def authorize(user_id: str, resource: str) -> bool:
return await openfga_client.check(user_id, "read", resource)
# ✅ CPU-bound work can be sync (no I/O)
def calculate_similarity(text1: str, text2: str) -> float:
# Pure computation, no I/O
return difflib.SequenceMatcher(None, text1, text2).ratio()
Async Everywhere
All layers of the application use async:
┌─────────────────────────────────────┐
│ API Layer (FastAPI) │ async endpoints
├─────────────────────────────────────┤
│ MCP Server Layer │ async MCP handlers
├─────────────────────────────────────┤
│ Business Logic (Agent, Auth) │ async methods
├─────────────────────────────────────┤
│ Data Access (Redis, OpenFGA, etc.) │ async clients
└─────────────────────────────────────┘
Async Libraries
We use async-compatible libraries:
| Component | Async Library | Sync Alternative (Rejected) |
|---|
| HTTP Client | httpx.AsyncClient | requests |
| Redis | redis.asyncio | redis |
| LLM Providers | litellm.acompletion | litellm.completion |
| Web Framework | FastAPI | Flask |
| OpenFGA | openfga_sdk (async) | N/A |
| Keycloak | python-keycloak (async methods) | N/A |
Async/Await Patterns
Pattern 1: Concurrent Execution
# Execute multiple I/O operations concurrently
async def enrich_user_data(user_id: str) -> UserData:
# Run 3 API calls in parallel (total time = max(t1, t2, t3), not sum)
session, roles, preferences = await asyncio.gather(
get_session(user_id), # 20ms
get_user_roles(user_id), # 50ms
get_preferences(user_id), # 30ms
)
# Total: ~50ms (not 100ms)
return UserData(session=session, roles=roles, preferences=preferences)
Pattern 2: Async Iteration
# Stream results instead of blocking
async def process_batch(user_ids: List[str]):
async for user_id in async_iterator(user_ids):
result = await process_user(user_id)
yield result # Stream results as they complete
Pattern 3: Timeout Management
# Prevent operations from hanging
async def call_llm_with_timeout(prompt: str, timeout: int = 30) -> str:
try:
return await asyncio.wait_for(
llm.acall(prompt),
timeout=timeout
)
except asyncio.TimeoutError:
raise LLMTimeoutError(f"LLM call exceeded {timeout}s timeout")
Consequences
Positive Consequences
-
High Throughput: Single worker handles 100+ concurrent requests
- Example: 100 LLM calls in progress, not blocking each other
- Throughput: 10-50x improvement over sync
-
Resource Efficiency: Far fewer workers needed
- Sync: 100 workers for 100 concurrent requests
- Async: 4-8 workers for 100 concurrent requests
-
Better User Experience: Lower latency for concurrent operations
- Parallel API calls complete in max(t1, t2, …) not sum(t1, t2, …)
-
Scalability: Handle 1000+ concurrent connections per instance
-
Cost Savings: Fewer servers/pods required for same load
Negative Consequences
-
Complexity: Async code is harder to write and debug
- Must understand event loops, coroutines, async context
- Stack traces can be confusing
-
Library Constraints: Must use async-compatible libraries
- Some libraries only have sync versions (workaround: run in executor)
- Mixing sync/async requires careful handling
-
Testing Challenges: Async tests require
pytest-asyncio
@pytest.mark.asyncio
async def test_async_function():
result = await async_function()
assert result == expected
-
Blocking Pitfalls: Accidentally using sync I/O blocks event loop
# ❌ BAD: Blocks event loop
async def bad_handler():
result = requests.get("https://api.com") # Sync call!
return result.json()
# ✅ GOOD: Non-blocking
async def good_handler():
async with httpx.AsyncClient() as client:
result = await client.get("https://api.com")
return result.json()
Neutral Consequences
- Learning Curve: Team must learn async patterns
- Migration Effort: Existing sync code requires refactoring
- Debugging Tools: Need async-aware profilers (e.g.,
aiomonitor)
Implementation Details
Async Codebase Statistics
# 451 async functions across codebase
$ grep -r "async def" src/ --include="*.py" | wc -l
451
Key Async Components:
- LLM Factory (
llm/factory.py): async def acall()
- Session Management (
auth/session.py): All methods async
- User Provider (
auth/user_provider.py): async def authenticate()
- OpenFGA Client (
auth/openfga.py): async def check()
- MCP Server (
mcp/server_streamable.py, mcp/server_stdio.py): Async handlers
- Agent Graph (
core/agent.py): async def ainvoke()
FastAPI Integration
# src/mcp_server_langgraph/api/gdpr.py
from fastapi import FastAPI, Depends
app = FastAPI()
@app.get("/api/gdpr/export")
async def export_user_data(
user_id: str,
session_store: SessionStore = Depends(get_session_store)
):
# All async I/O
session = await session_store.get(user_id)
roles = await get_user_roles(user_id)
data = await export_service.export(user_id)
return data
Async Context Managers
# Async resource management
async with redis.asyncio.from_url(redis_url) as client:
await client.set("key", "value")
# Connection auto-closed on exit
Running Sync Code in Async Context
When unavoidable sync code must run:
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
async def run_sync_in_async(sync_func, *args):
"""Run blocking sync function without blocking event loop"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, sync_func, *args)
# Usage
result = await run_sync_in_async(blocking_cpu_work, data)
Async Testing
# tests/test_session.py
import pytest
@pytest.mark.asyncio
async def test_session_creation():
session_store = InMemorySessionStore()
# All test code is async
session_id = await session_store.create(
user_id="user:alice",
username="alice",
roles=["user"]
)
session = await session_store.get(session_id)
assert session.user_id == "user:alice"
# pytest.ini configuration
[tool.pytest.ini_options]
asyncio_mode = "strict"
Alternatives Considered
1. Synchronous Architecture (Traditional)
Description: Use synchronous Python with threaded workers (e.g., Gunicorn with threads)
Pros:
- Simpler code (no async/await)
- Easier debugging (linear stack traces)
- More library compatibility
Cons:
- Low throughput (~1-5 req/s per worker)
- High memory usage (each thread = ~8MB stack)
- GIL contention (threads compete for Global Interpreter Lock)
- Poor scalability (need 100+ workers for moderate load)
Why Rejected: Cannot achieve production-grade performance for I/O-heavy workloads
2. Sync with Celery (Task Queue)
Description: Sync API offloads long tasks to Celery workers
Pros:
- Sync API code (simpler)
- Background processing
- Retry logic built-in
Cons:
- Additional infrastructure (Redis/RabbitMQ for queue)
- Complexity (task serialization, result backends)
- Latency (queueing overhead)
- Not suitable for request-response (user waits for task)
Why Rejected: Adds complexity without solving core I/O concurrency issue
3. Threading (threading.Thread)
Description: Use Python threads for concurrency
Pros:
- Familiar threading model
- Standard library support
Cons:
- GIL bottleneck (only one thread runs Python code at a time)
- No benefit for CPU-bound tasks
- High memory (stack per thread)
- Complex synchronization (locks, deadlocks, race conditions)
Why Rejected: Python’s GIL makes threading ineffective for concurrency
4. Multiprocessing
Description: Use multiprocessing to fork worker processes
Pros:
- True parallelism (no GIL)
- Good for CPU-bound tasks
Cons:
- High memory (full process copy per worker)
- Slow startup (process forking overhead)
- IPC complexity (sharing data between processes)
- Not suitable for I/O-bound (overkill)
Why Rejected: Async is more efficient for I/O-heavy workloads
5. Hybrid (Sync API + Async I/O)
Description: Expose sync API but use async I/O internally
Pros:
- Sync API (easier for users)
- Async benefits internally
Cons:
- Complexity (mixing paradigms)
- Event loop management (who runs the loop?)
- Testing confusion (sync tests calling async code)
Why Rejected: Complexity outweighs benefits; modern Python embraces async APIs
Throughput Comparison
Scenario: 100 concurrent LLM API calls (each takes 2 seconds)
| Architecture | Total Time | Throughput | Workers Needed |
|---|
| Sync (threads) | 200 seconds | 0.5 req/s | 100 |
| Async | 2 seconds | 50 req/s | 1 |
Result: 100x throughput improvement
Memory Usage
| Architecture | Memory per Worker | 100 Concurrent Requests |
|---|
| Sync (threads) | 8 MB × 100 threads | 800 MB |
| Async | 50 MB (single process) | 50 MB |
Result: 16x memory efficiency
Real-World Metrics
From production deployments:
Sync Architecture:
- Workers: 32 (Gunicorn)
- Memory: 4 GB
- Throughput: 20 req/s
- CPU: 60% (GIL contention)
Async Architecture:
- Workers: 4 (Uvicorn)
- Memory: 512 MB
- Throughput: 200 req/s
- CPU: 25% (efficient I/O wait)
Integration Points
Uvicorn ASGI Server
# Start async server with 4 workers
uvicorn mcp_server_langgraph.mcp.server_streamable:app \
--workers 4 \
--host 0.0.0.0 \
--port 8000
Why Uvicorn: Native async support, ASGI protocol, high performance
LangGraph Async Support
# core/agent.py
from langgraph.graph import StateGraph
# Build graph with async nodes
graph = StateGraph(AgentState)
graph.add_node("route", route_input) # Async node
graph.add_node("llm", call_llm) # Async node
# Async invocation
compiled = graph.compile()
result = await compiled.ainvoke({"messages": [user_message]})
Redis Async Client
# auth/session.py
import redis.asyncio as redis
class RedisSessionStore(SessionStore):
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url, decode_responses=True)
async def create(self, user_id: str, ...) -> str:
await self.redis.setex(session_key, ttl, session_data)
return session_id
OpenFGA Async Client
# auth/openfga.py
from openfga_sdk import OpenFgaClient
class OpenFGAClient:
async def check(self, user: str, relation: str, object: str) -> bool:
response = await self.client.check(...)
return response.allowed
Best Practices
1. Always Await Async Functions
# ❌ BAD: Forgot await (returns coroutine, not result)
result = async_function()
# ✅ GOOD: Properly awaited
result = await async_function()
2. Use asyncio.gather() for Concurrent I/O
# ❌ BAD: Sequential (slow)
result1 = await call1()
result2 = await call2()
result3 = await call3()
# ✅ GOOD: Concurrent (fast)
result1, result2, result3 = await asyncio.gather(
call1(),
call2(),
call3()
)
3. Set Timeouts for I/O Operations
# Always use timeouts to prevent hanging
result = await asyncio.wait_for(external_api_call(), timeout=30)
4. Use Async Context Managers
async with httpx.AsyncClient() as client:
await client.get("https://api.com")
# Client auto-closed
Future Enhancements
- Async Streaming: Stream LLM responses token-by-token
- Async Background Tasks: Scheduled jobs with
apscheduler async support
- Async Batch Processing: Process large datasets with async workers
- Structured Concurrency: Explore
anyio for cleaner async patterns
References