Skip to main content

19. Async-First Architecture

Date: 2025-10-13

Status

Accepted

Category

Core Architecture

Context

Modern AI agents require numerous I/O-bound operations:
  • LLM API calls: 1-30 seconds per request (network latency)
  • Database queries: Redis sessions, OpenFGA authorization checks
  • External APIs: Keycloak authentication, Infisical secrets retrieval
  • Concurrent requests: Multiple users, parallel tool executions
Traditional synchronous Python blocks the event loop on I/O:
# Synchronous (BLOCKS entire thread)
response = requests.get("https://api.llm.com/chat")  # 5 seconds
session = redis_client.get("session:123")            # 20ms
auth_check = openfga.check(user, resource)           # 50ms

# Total: 5.07 seconds to handle 1 request
# Throughput: ~0.2 requests/second per worker
This creates severe bottlenecks:
  • Low throughput: Each worker handles only 1 request at a time
  • Poor scalability: Need 100 workers to handle 20 concurrent users
  • Resource waste: Workers idle during I/O waits
  • Timeout risk: Long chains of I/O operations easily exceed timeouts

Decision

We will adopt an async-first architecture using Python’s asyncio throughout the codebase.

Core Principle

All I/O operations MUST be async. Pure CPU work MAY be sync.
# ✅ Async I/O operations
async def call_llm(prompt: str) -> str:
    async with httpx.AsyncClient() as client:
        response = await client.post("https://api.llm.com/chat", json={"prompt": prompt})
        return response.json()

async def get_session(session_id: str) -> SessionData:
    return await redis.get(f"session:{session_id}")

async def authorize(user_id: str, resource: str) -> bool:
    return await openfga_client.check(user_id, "read", resource)

# ✅ CPU-bound work can be sync (no I/O)
def calculate_similarity(text1: str, text2: str) -> float:
    # Pure computation, no I/O
    return difflib.SequenceMatcher(None, text1, text2).ratio()

Async Everywhere

All layers of the application use async:
┌─────────────────────────────────────┐
│ API Layer (FastAPI)                 │ async endpoints
├─────────────────────────────────────┤
│ MCP Server Layer                    │ async MCP handlers
├─────────────────────────────────────┤
│ Business Logic (Agent, Auth)        │ async methods
├─────────────────────────────────────┤
│ Data Access (Redis, OpenFGA, etc.)  │ async clients
└─────────────────────────────────────┘

Async Libraries

We use async-compatible libraries:
ComponentAsync LibrarySync Alternative (Rejected)
HTTP Clienthttpx.AsyncClientrequests
Redisredis.asyncioredis
LLM Providerslitellm.acompletionlitellm.completion
Web FrameworkFastAPIFlask
OpenFGAopenfga_sdk (async)N/A
Keycloakpython-keycloak (async methods)N/A

Async/Await Patterns

Pattern 1: Concurrent Execution

# Execute multiple I/O operations concurrently
async def enrich_user_data(user_id: str) -> UserData:
    # Run 3 API calls in parallel (total time = max(t1, t2, t3), not sum)
    session, roles, preferences = await asyncio.gather(
        get_session(user_id),      # 20ms
        get_user_roles(user_id),   # 50ms
        get_preferences(user_id),  # 30ms
    )
    # Total: ~50ms (not 100ms)
    return UserData(session=session, roles=roles, preferences=preferences)

Pattern 2: Async Iteration

# Stream results instead of blocking
async def process_batch(user_ids: List[str]):
    async for user_id in async_iterator(user_ids):
        result = await process_user(user_id)
        yield result  # Stream results as they complete

Pattern 3: Timeout Management

# Prevent operations from hanging
async def call_llm_with_timeout(prompt: str, timeout: int = 30) -> str:
    try:
        return await asyncio.wait_for(
            llm.acall(prompt),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        raise LLMTimeoutError(f"LLM call exceeded {timeout}s timeout")

Consequences

Positive Consequences

  • High Throughput: Single worker handles 100+ concurrent requests
    • Example: 100 LLM calls in progress, not blocking each other
    • Throughput: 10-50x improvement over sync
  • Resource Efficiency: Far fewer workers needed
    • Sync: 100 workers for 100 concurrent requests
    • Async: 4-8 workers for 100 concurrent requests
  • Better User Experience: Lower latency for concurrent operations
    • Parallel API calls complete in max(t1, t2, …) not sum(t1, t2, …)
  • Scalability: Handle 1000+ concurrent connections per instance
  • Cost Savings: Fewer servers/pods required for same load

Negative Consequences

  • Complexity: Async code is harder to write and debug
    • Must understand event loops, coroutines, async context
    • Stack traces can be confusing
  • Library Constraints: Must use async-compatible libraries
    • Some libraries only have sync versions (workaround: run in executor)
    • Mixing sync/async requires careful handling
  • Testing Challenges: Async tests require pytest-asyncio
    @pytest.mark.asyncio
    async def test_async_function():
        result = await async_function()
        assert result == expected
    
  • Blocking Pitfalls: Accidentally using sync I/O blocks event loop
    # ❌ BAD: Blocks event loop
    async def bad_handler():
        result = requests.get("https://api.com")  # Sync call!
        return result.json()
    
    # ✅ GOOD: Non-blocking
    async def good_handler():
        async with httpx.AsyncClient() as client:
            result = await client.get("https://api.com")
            return result.json()
    

Neutral Consequences

  • Learning Curve: Team must learn async patterns
  • Migration Effort: Existing sync code requires refactoring
  • Debugging Tools: Need async-aware profilers (e.g., aiomonitor)

Implementation Details

Async Codebase Statistics

# 451 async functions across codebase
$ grep -r "async def" src/ --include="*.py" | wc -l
451
Key Async Components:
  • LLM Factory (llm/factory.py): async def acall()
  • Session Management (auth/session.py): All methods async
  • User Provider (auth/user_provider.py): async def authenticate()
  • OpenFGA Client (auth/openfga.py): async def check()
  • MCP Server (mcp/server_streamable.py, mcp/server_stdio.py): Async handlers
  • Agent Graph (core/agent.py): async def ainvoke()

FastAPI Integration

# src/mcp_server_langgraph/api/gdpr.py
from fastapi import FastAPI, Depends

app = FastAPI()

@app.get("/api/gdpr/export")
async def export_user_data(
    user_id: str,
    session_store: SessionStore = Depends(get_session_store)
):
    # All async I/O
    session = await session_store.get(user_id)
    roles = await get_user_roles(user_id)
    data = await export_service.export(user_id)
    return data

Async Context Managers

# Async resource management
async with redis.asyncio.from_url(redis_url) as client:
    await client.set("key", "value")
    # Connection auto-closed on exit

Running Sync Code in Async Context

When unavoidable sync code must run:
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

async def run_sync_in_async(sync_func, *args):
    """Run blocking sync function without blocking event loop"""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(executor, sync_func, *args)

# Usage
result = await run_sync_in_async(blocking_cpu_work, data)

Async Testing

# tests/test_session.py
import pytest

@pytest.mark.asyncio
async def test_session_creation():
    session_store = InMemorySessionStore()

    # All test code is async
    session_id = await session_store.create(
        user_id="user:alice",
        username="alice",
        roles=["user"]
    )

    session = await session_store.get(session_id)
    assert session.user_id == "user:alice"

# pytest.ini configuration
[tool.pytest.ini_options]
asyncio_mode = "strict"

Alternatives Considered

1. Synchronous Architecture (Traditional)

Description: Use synchronous Python with threaded workers (e.g., Gunicorn with threads) Pros:
  • Simpler code (no async/await)
  • Easier debugging (linear stack traces)
  • More library compatibility
Cons:
  • Low throughput (~1-5 req/s per worker)
  • High memory usage (each thread = ~8MB stack)
  • GIL contention (threads compete for Global Interpreter Lock)
  • Poor scalability (need 100+ workers for moderate load)
Why Rejected: Cannot achieve production-grade performance for I/O-heavy workloads

2. Sync with Celery (Task Queue)

Description: Sync API offloads long tasks to Celery workers Pros:
  • Sync API code (simpler)
  • Background processing
  • Retry logic built-in
Cons:
  • Additional infrastructure (Redis/RabbitMQ for queue)
  • Complexity (task serialization, result backends)
  • Latency (queueing overhead)
  • Not suitable for request-response (user waits for task)
Why Rejected: Adds complexity without solving core I/O concurrency issue

3. Threading (threading.Thread)

Description: Use Python threads for concurrency Pros:
  • Familiar threading model
  • Standard library support
Cons:
  • GIL bottleneck (only one thread runs Python code at a time)
  • No benefit for CPU-bound tasks
  • High memory (stack per thread)
  • Complex synchronization (locks, deadlocks, race conditions)
Why Rejected: Python’s GIL makes threading ineffective for concurrency

4. Multiprocessing

Description: Use multiprocessing to fork worker processes Pros:
  • True parallelism (no GIL)
  • Good for CPU-bound tasks
Cons:
  • High memory (full process copy per worker)
  • Slow startup (process forking overhead)
  • IPC complexity (sharing data between processes)
  • Not suitable for I/O-bound (overkill)
Why Rejected: Async is more efficient for I/O-heavy workloads

5. Hybrid (Sync API + Async I/O)

Description: Expose sync API but use async I/O internally Pros:
  • Sync API (easier for users)
  • Async benefits internally
Cons:
  • Complexity (mixing paradigms)
  • Event loop management (who runs the loop?)
  • Testing confusion (sync tests calling async code)
Why Rejected: Complexity outweighs benefits; modern Python embraces async APIs

Performance Benchmarks

Throughput Comparison

Scenario: 100 concurrent LLM API calls (each takes 2 seconds)
ArchitectureTotal TimeThroughputWorkers Needed
Sync (threads)200 seconds0.5 req/s100
Async2 seconds50 req/s1
Result: 100x throughput improvement

Memory Usage

ArchitectureMemory per Worker100 Concurrent Requests
Sync (threads)8 MB × 100 threads800 MB
Async50 MB (single process)50 MB
Result: 16x memory efficiency

Real-World Metrics

From production deployments:
Sync Architecture:
- Workers: 32 (Gunicorn)
- Memory: 4 GB
- Throughput: 20 req/s
- CPU: 60% (GIL contention)

Async Architecture:
- Workers: 4 (Uvicorn)
- Memory: 512 MB
- Throughput: 200 req/s
- CPU: 25% (efficient I/O wait)

Integration Points

Uvicorn ASGI Server

# Start async server with 4 workers
uvicorn mcp_server_langgraph.mcp.server_streamable:app \
    --workers 4 \
    --host 0.0.0.0 \
    --port 8000
Why Uvicorn: Native async support, ASGI protocol, high performance

LangGraph Async Support

# core/agent.py
from langgraph.graph import StateGraph

# Build graph with async nodes
graph = StateGraph(AgentState)
graph.add_node("route", route_input)  # Async node
graph.add_node("llm", call_llm)       # Async node

# Async invocation
compiled = graph.compile()
result = await compiled.ainvoke({"messages": [user_message]})

Redis Async Client

# auth/session.py
import redis.asyncio as redis

class RedisSessionStore(SessionStore):
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url, decode_responses=True)

    async def create(self, user_id: str, ...) -> str:
        await self.redis.setex(session_key, ttl, session_data)
        return session_id

OpenFGA Async Client

# auth/openfga.py
from openfga_sdk import OpenFgaClient

class OpenFGAClient:
    async def check(self, user: str, relation: str, object: str) -> bool:
        response = await self.client.check(...)
        return response.allowed

Best Practices

1. Always Await Async Functions

# ❌ BAD: Forgot await (returns coroutine, not result)
result = async_function()

# ✅ GOOD: Properly awaited
result = await async_function()

2. Use asyncio.gather() for Concurrent I/O

# ❌ BAD: Sequential (slow)
result1 = await call1()
result2 = await call2()
result3 = await call3()

# ✅ GOOD: Concurrent (fast)
result1, result2, result3 = await asyncio.gather(
    call1(),
    call2(),
    call3()
)

3. Set Timeouts for I/O Operations

# Always use timeouts to prevent hanging
result = await asyncio.wait_for(external_api_call(), timeout=30)

4. Use Async Context Managers

async with httpx.AsyncClient() as client:
    await client.get("https://api.com")
    # Client auto-closed

Future Enhancements

  • Async Streaming: Stream LLM responses token-by-token
  • Async Background Tasks: Scheduled jobs with apscheduler async support
  • Async Batch Processing: Process large datasets with async workers
  • Structured Concurrency: Explore anyio for cleaner async patterns

References