Documentation Index
Fetch the complete documentation index at: https://mcp-server-langgraph.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Agent Architecture and Usage
This document describes the agent architecture used in MCP Server LangGraph and provides guidance for working with LangGraph agents and Pydantic AI integration.
Related Documentation: This guide covers agent architecture. For Claude Code workflow guidance, see CLAUDE.md.
Table of Contents
Overview
MCP Server LangGraph implements a functional agent architecture using LangGraph for stateful conversation management and Pydantic AI for structured outputs and tool calling.
Note: This guide is placed at the repository root for maximum discoverability. For project-specific Claude Code workflow patterns, see CLAUDE.md.
Architecture Diagram
LangGraph Agent
Core Components
Located in: src/mcp_server_langgraph/core/agent.py
1. AgentState
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
"""State for the LangGraph agent"""
messages: Annotated[Sequence[BaseMessage], add_messages]
# Add custom state fields as needed
2. Agent Graph
from langgraph.graph import StateGraph, END
def create_agent_graph(llm, tools):
"""Create the LangGraph agent graph"""
# Define nodes
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
# Define edges
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{
"continue": "tools",
"end": END
}
)
workflow.add_edge("tools", "agent")
return workflow.compile()
3. Conditional Routing
def should_continue(state: AgentState) -> str:
"""Determine if agent should continue or end"""
messages = state["messages"]
last_message = messages[-1]
# If LLM made tool calls, continue to tools node
if last_message.tool_calls:
return "continue"
# Otherwise end
return "end"
Stateful Conversation
LangGraph maintains conversation state through checkpointing:
from langgraph.checkpoint.memory import MemorySaver
# In-memory checkpointing (development)
memory = MemorySaver()
agent_graph = create_agent_graph(llm, tools).compile(
checkpointer=memory
)
# Persistent checkpointing (production)
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
)
agent_graph = create_agent_graph(llm, tools).compile(
checkpointer=checkpointer
)
LangGraph handles tool execution automatically:
from langchain_core.tools import tool
@tool
def search_documents(query: str) -> str:
"""Search internal documents"""
# Implementation
return results
tools = [search_documents]
agent = create_agent_graph(llm, tools)
Pydantic AI Integration
Overview
Located in: src/mcp_server_langgraph/llm/pydantic_agent.py
Pydantic AI provides:
- Structured outputs: Type-safe responses with Pydantic models
- Model abstraction: Unified interface across LLM providers
- Tool integration: Function calling with validation
- Streaming support: Token-by-token streaming
Agent Creation
from pydantic_ai import Agent
from pydantic import BaseModel
class SearchResult(BaseModel):
"""Structured search result"""
title: str
summary: str
relevance_score: float
# Create agent with structured output
search_agent = Agent(
model="openai:gpt-4",
result_type=SearchResult,
system_prompt="You are a helpful search assistant"
)
# Run agent
result = await search_agent.run("Search for Python tutorials")
# result is a SearchResult instance
from pydantic_ai import RunContext
@search_agent.tool
async def search_database(
ctx: RunContext[dict],
query: str,
limit: int = 10
) -> list[dict]:
"""Search the document database"""
# Access context
user_id = ctx.deps.get("user_id")
# Implementation
results = await db.search(query, limit=limit, user_id=user_id)
return results
Model Switching
Pydantic AI supports dynamic model switching:
from pydantic_ai.models import KnownModelName
# Switch between models
agent = Agent(
model="openai:gpt-4", # Default
)
# Runtime override
result = await agent.run(
"Your query",
model="anthropic:claude-sonnet-4-5-20250929" # Override
)
Structured Output Examples
from pydantic import BaseModel, Field
from typing import List
class CodeReview(BaseModel):
"""Structured code review output"""
issues: List[str] = Field(description="List of issues found")
suggestions: List[str] = Field(description="Improvement suggestions")
security_concerns: List[str] = Field(description="Security issues")
rating: int = Field(ge=1, le=10, description="Code quality rating")
review_agent = Agent(
model="anthropic:claude-sonnet-4-5-20250929",
result_type=CodeReview,
system_prompt="You are an expert code reviewer"
)
# Get structured review
review = await review_agent.run(code_snippet)
# review.issues, review.suggestions are fully typed
Agent Configuration
LLM Selection
From: src/mcp_server_langgraph/llm/factory.py
from mcp_server_langgraph.llm.factory import create_llm_from_config
# Configuration via settings
llm = create_llm_from_config(
model_name="anthropic/claude-sonnet-4-5-20250929",
temperature=0.7,
max_tokens=4096,
fallback_models=["openai/gpt-4", "google/gemini-pro"]
)
Environment Variables
# Primary model
export MODEL_NAME="anthropic/claude-sonnet-4-5-20250929"
export ANTHROPIC_API_KEY="sk-ant-..."
# Fallback models
export FALLBACK_MODELS="openai/gpt-4,google/gemini-pro"
export OPENAI_API_KEY="sk-..."
export GOOGLE_API_KEY="..."
# Model parameters
export TEMPERATURE=0.7
export MAX_TOKENS=4096
Multi-Model Fallback
from mcp_server_langgraph.llm.factory import LLMFactory
factory = LLMFactory(
primary_model="anthropic/claude-sonnet-4-5-20250929",
fallback_models=[
"openai/gpt-4",
"google/gemini-pro",
"ollama/llama3.1"
]
)
# Automatic fallback on failure
llm = factory.create_with_fallback()
MCP (Model Context Protocol) tools are exposed via the MCP server:
from mcp.server import Server
from mcp.server.stdio import stdio_server
server = Server("langgraph-agent")
@server.list_tools()
async def list_tools():
"""List available tools"""
return [
Tool(
name="search_documents",
description="Search internal documents",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"},
"limit": {"type": "integer"}
},
"required": ["query"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
"""Execute tool"""
if name == "search_documents":
return await search_documents(**arguments)
Add custom tools to the agent:
from langchain_core.tools import StructuredTool
def calculate_metrics(data: dict) -> dict:
"""Calculate performance metrics"""
# Implementation
return metrics
tools = [
StructuredTool.from_function(
func=calculate_metrics,
name="calculate_metrics",
description="Calculate performance metrics from data"
)
]
agent = create_agent_graph(llm, tools)
Tools respect OpenFGA permissions:
from mcp_server_langgraph.auth.middleware import AuthMiddleware
auth = AuthMiddleware(openfga_client=openfga)
@server.call_tool()
async def call_tool(name: str, arguments: dict, request):
"""Execute tool with authorization"""
user_id = request.user_id
# Check permission
allowed = await auth.check_authorization(
user_id=user_id,
relation="execute",
object=f"tool:{name}"
)
if not allowed:
raise PermissionError(f"User {user_id} cannot execute {name}")
# Execute tool
return await tools[name](**arguments)
State Management
Conversation Memory
LangGraph manages conversation history:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
agent = create_agent_graph(llm, tools).compile(
checkpointer=memory
)
# Each conversation has unique thread_id
config = {"configurable": {"thread_id": "user-123-conv-456"}}
# First message
response = await agent.ainvoke(
{"messages": [("user", "Hello")]},
config=config
)
# Continues previous conversation
response = await agent.ainvoke(
{"messages": [("user", "What did I just say?")]},
config=config
)
# Agent remembers "Hello"
Persistent State
For production, use PostgreSQL checkpointing:
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
os.getenv("POSTGRES_URL")
)
agent = create_agent_graph(llm, tools).compile(
checkpointer=checkpointer
)
State Schema
Define custom state fields:
class CustomAgentState(TypedDict):
"""Extended agent state"""
messages: Annotated[Sequence[BaseMessage], add_messages]
user_id: str # Track user
context: dict # Additional context
step_count: int # Track iterations
Best Practices
1. Error Handling
from langchain_core.runnables import RunnableConfig
async def agent_node(state: AgentState):
"""Agent node with error handling"""
try:
result = await llm.ainvoke(state["messages"])
return {"messages": [result]}
except Exception as e:
logger.error(f"Agent error: {e}", exc_info=True)
metrics.agent_errors.add(1, {"error_type": type(e).__name__})
# Return error message to user
error_msg = AIMessage(
content=f"I encountered an error: {str(e)}"
)
return {"messages": [error_msg]}
2. Rate Limiting
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter(
requests_per_second=10,
check_every_n_seconds=0.1
)
llm = ChatAnthropic(
model="claude-sonnet-4-5-20250929",
rate_limiter=rate_limiter
)
3. Streaming Responses
async def stream_agent_response(query: str, thread_id: str):
"""Stream agent responses token by token"""
config = {"configurable": {"thread_id": thread_id}}
async for chunk in agent.astream(
{"messages": [("user", query)]},
config=config
):
if "messages" in chunk:
for message in chunk["messages"]:
if hasattr(message, "content"):
yield message.content
from pydantic import BaseModel, validator
class SearchInput(BaseModel):
"""Validated search input"""
query: str
limit: int = 10
@validator("query")
def query_not_empty(cls, v):
if not v.strip():
raise ValueError("Query cannot be empty")
return v
@validator("limit")
def limit_in_range(cls, v):
if v < 1 or v > 100:
raise ValueError("Limit must be between 1 and 100")
return v
@tool(args_schema=SearchInput)
def search(query: str, limit: int = 10) -> str:
"""Search with validation"""
# Input is validated by Pydantic
return results
5. Observability
from mcp_server_langgraph.observability.telemetry import tracer, metrics
async def agent_node(state: AgentState):
"""Agent node with observability"""
with tracer.start_as_current_span("agent.invoke") as span:
span.set_attribute("message_count", len(state["messages"]))
start_time = time.time()
try:
result = await llm.ainvoke(state["messages"])
duration_ms = (time.time() - start_time) * 1000
metrics.agent_duration.record(duration_ms)
metrics.agent_invocations.add(1, {"status": "success"})
return {"messages": [result]}
except Exception as e:
metrics.agent_invocations.add(1, {"status": "error"})
raise
6. Testing Agents
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_agent_tool_calling():
"""Test agent calls tools correctly"""
# Mock LLM
mock_llm = AsyncMock()
mock_llm.ainvoke.return_value = AIMessage(
content="",
tool_calls=[
{
"name": "search_documents",
"args": {"query": "Python", "limit": 5},
"id": "call_123"
}
]
)
# Mock tool
mock_tool = AsyncMock(return_value="Search results...")
# Create agent
agent = create_agent_graph(mock_llm, [mock_tool])
# Test
result = await agent.ainvoke(
{"messages": [("user", "Search for Python")]}
)
# Verify
assert mock_tool.called
assert mock_tool.call_args[1]["query"] == "Python"
1. Token Usage
Monitor token usage to optimize costs:
from langchain.callbacks import get_openai_callback
with get_openai_callback() as cb:
result = await agent.ainvoke({"messages": messages})
logger.info(f"Tokens used: {cb.total_tokens}")
logger.info(f"Cost: ${cb.total_cost:.4f}")
2. Caching
Use caching for repeated queries:
from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache
set_llm_cache(InMemoryCache())
# Repeated queries use cache
result1 = await llm.ainvoke("What is Python?")
result2 = await llm.ainvoke("What is Python?") # Cached
Execute independent tools in parallel:
import asyncio
async def tool_node(state: AgentState):
"""Execute tools in parallel when possible"""
tool_calls = state["messages"][-1].tool_calls
# Group independent tools
tasks = [
execute_tool(call["name"], call["args"])
for call in tool_calls
]
# Execute in parallel
results = await asyncio.gather(*tasks)
return {"messages": results}
Resources
Last Updated: 2025-10-14
LangGraph Version: 0.6.10 (upgraded from 0.2.28)
Pydantic AI Version: 0.0.15