Overview
OpenAI provides industry-leading language models with the GPT-5 series. This guide covers setup, configuration, and best practices for using OpenAI models with the MCP Server.
GPT-5 is OpenAI’s latest flagship model with enhanced reasoning, multimodal capabilities, and improved efficiency.
Available Models
Model Context Window Use Case Pricing (per 1M tokens) gpt-5.1 128K tokens Best all-around, latest flagship 2.50 i n p u t / 2.50 input / 2.50 in p u t / 10.00 outputgpt-5.1-pro 128K tokens Most capable, advanced reasoning 10.00 i n p u t / 10.00 input / 10.00 in p u t / 30.00 outputgpt-5-mini 128K tokens Fast and cost-effective 0.15 i n p u t / 0.15 input / 0.15 in p u t / 0.60 outputgpt-5.1-nano 128K tokens Smallest, fastest, ultra low-cost 0.10 i n p u t / 0.10 input / 0.10 in p u t / 0.40 output
Recommended : gpt-5.1 for production (best performance/cost ratio)
Quick Start
Configure
Using Infisical (Recommended) :# Add to Infisical dashboard
OPENAI_API_KEY = sk-...your-key
# In .env, reference Infisical
INFISICAL_PROJECT_ID = your-project-id
INFISICAL_CLIENT_ID = your-client-id
INFISICAL_CLIENT_SECRET = your-client-secret
Using Environment Variables (Development only):# .env
LLM_PROVIDER = openai
OPENAI_API_KEY = sk-...your-key
LLM_MODEL_NAME = gpt-5.1
Test
from mcp_server_langgraph.llm.factory import LLMFactory
llm = LLMFactory(
provider = "openai" ,
model_name = "gpt-5.1"
)
response = await llm.ainvoke( "Explain machine learning in simple terms" )
print (response.content)
Configuration Options
Basic Configuration
from mcp_server_langgraph.llm.factory import LLMFactory
llm = LLMFactory(
provider = "openai" ,
model_name = "gpt-5.1" ,
temperature = 1.0 , # 0.0 to 2.0
max_tokens = 4096 ,
timeout = 60
)
## Invoke
response = await llm.ainvoke( "What is quantum computing?" )
print (response.content)
Advanced Configuration
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model = "gpt-5.1" ,
openai_api_key = settings.openai_api_key,
# Generation parameters
temperature = 1.0 ,
max_tokens = 4096 ,
top_p = 1.0 ,
frequency_penalty = 0.0 ,
presence_penalty = 0.0 ,
# Timeouts
request_timeout = 60 ,
max_retries = 2 ,
# Streaming
streaming = True ,
# Organization (if applicable)
openai_organization = "org-..." ,
# Custom headers
default_headers = {
"OpenAI-Beta" : "assistants=v2"
}
)
Model Selection Strategy
def select_openai_model ( task_type : str , complexity : str ) -> str :
"""Select appropriate OpenAI model based on task"""
# Fast, simple tasks
if task_type == "chat" and complexity == "simple" :
return "gpt-5.1-nano" # Smallest, fastest, ultra low-cost
# Standard tasks
elif complexity == "low" :
return "gpt-5-mini" # Fast and cost-effective
# Multimodal or complex reasoning
elif task_type in [ "vision" , "multimodal" ] or complexity == "high" :
return "gpt-5.1" # Best all-around flagship
# Maximum intelligence and advanced reasoning
elif complexity == "expert" :
return "gpt-5.1-pro" # Most capable
# Default
return "gpt-5.1"
## Use dynamically
llm = LLMFactory(
provider = "openai" ,
model_name = select_openai_model( task_type = "chat" , complexity = "medium" )
)
Features
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
@tool
def get_current_weather ( location : str , unit : str = "fahrenheit" ) -> str :
"""Get the current weather in a given location.
Args:
location: The city and state, e.g. San Francisco, CA
unit: The unit of temperature, either "celsius" or "fahrenheit"
"""
# Implementation
return f "Weather in { location } : 72° { unit[ 0 ].upper() } , Sunny"
@tool
def search_web ( query : str ) -> str :
"""Search the web for current information.
Args:
query: The search query
"""
# Implementation
return f "Search results for: { query } "
## Create agent with tools
llm_with_tools = llm.bind_tools([get_current_weather, search_web])
agent = create_react_agent(
llm_with_tools,
tools = [get_current_weather, search_web]
)
## Run agent
result = await agent.ainvoke({
"messages" : [( "user" , "What's the weather in Tokyo and search for recent AI news" )]
})
print (result[ "messages" ][ - 1 ].content)
Vision (GPT-5, GPT-5 Pro)
import base64
from langchain_core.messages import HumanMessage
## Load image
with open ( "chart.png" , "rb" ) as f:
image_data = base64.b64encode(f.read()).decode()
## Create vision message
message = HumanMessage(
content = [
{
"type" : "text" ,
"text" : "What's in this image? Provide a detailed description."
},
{
"type" : "image_url" ,
"image_url" : {
"url" : f "data:image/png;base64, { image_data } " ,
"detail" : "high" # "low", "high", or "auto"
}
}
]
)
response = await llm.ainvoke([message])
print (response.content)
Structured Output (JSON Mode)
from pydantic import BaseModel, Field
from typing import List
class SentimentAnalysis ( BaseModel ):
"""Sentiment analysis result"""
sentiment: str = Field( description = "Overall sentiment: positive, negative, or neutral" )
confidence: float = Field( description = "Confidence score 0-1" )
key_phrases: List[ str ] = Field( description = "Key phrases that influenced the sentiment" )
## Force structured output
structured_llm = llm.with_structured_output(SentimentAnalysis)
text = "I absolutely love this product! It exceeded all my expectations."
response = await structured_llm.ainvoke(
f "Analyze the sentiment of this text: \n\n { text } "
)
print (response.model_dump_json( indent = 2 ))
## Output:
## {
## "sentiment": "positive",
## "confidence": 0.95,
## "key_phrases": ["absolutely love", "exceeded expectations"]
## }
Streaming
## Stream token-by-token
async def stream_response ( query : str ):
async for chunk in llm.astream(query):
print (chunk.content, end = "" , flush = True )
yield chunk.content
## Use in API
from fastapi.responses import StreamingResponse
@app.post ( "/message/stream" )
async def stream_message ( request : MessageRequest):
return StreamingResponse(
stream_response(request.query),
media_type = "text/event-stream"
)
System Prompts
from langchain_core.prompts import ChatPromptTemplate
## Define system prompt
prompt = ChatPromptTemplate.from_messages([
( "system" , """You are a helpful AI assistant specializing in {domain} .
Guidelines:
- Provide accurate, well-researched information
- Cite sources when applicable
- Acknowledge uncertainty
- Use {style} communication style
Current date: {current_date} """ ),
( "user" , " {query} " )
])
## Create chain
from datetime import datetime
chain = prompt | llm
## Run
response = await chain.ainvoke({
"domain" : "software engineering" ,
"style" : "technical but accessible" ,
"current_date" : datetime.now().strftime( "%Y-%m- %d " ),
"query" : "Explain microservices architecture"
})
print (response.content)
## Force JSON output
llm_json = C hatOpenAI(
model = "gpt-5.1" ,
model_kwargs ={"response_format": {"type": "json_object"} }
)
response = a wait llm_json.ainvoke(
"Extract person info as JSON: John Smith is a 35-year-old software engineer"
)
import json
data = j son.loads(response.content)
print(data)
## Output: {"name": "John Smith", "age": 35, "occupation": "software engineer"}
Production Deployment
Kubernetes Configuration
apiVersion : apps/v1
kind : Deployment
metadata :
name : mcp-server-langgraph
namespace : mcp-server-langgraph
spec :
replicas : 3
template :
spec :
containers :
- name : agent
image : mcp-server-langgraph:latest
env :
# Use OpenAI
- name : LLM_PROVIDER
value : "openai"
- name : LLM_MODEL_NAME
value : "gpt-5.1"
# API key from secret (loaded by Infisical operator)
- name : OPENAI_API_KEY
valueFrom :
secretKeyRef :
name : mcp-server-langgraph-secrets
key : OPENAI_API_KEY
# Optional: Organization ID
- name : OPENAI_ORG_ID
value : "org-..."
# Performance tuning
- name : LLM_TIMEOUT
value : "60"
- name : LLM_MAX_TOKENS
value : "4096"
- name : LLM_TEMPERATURE
value : "1.0"
resources :
requests :
cpu : 1000m
memory : 1Gi
limits :
cpu : 4000m
memory : 4Gi
Rate Limiting
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter( key_func = get_remote_address)
## OpenAI rate limits (varies by tier):
## Free tier: 3 RPM, 40K TPM
## Tier 1: 500 RPM, 200K TPM
## Tier 2: 5000 RPM, 2M TPM
@app.post ( "/message" )
@limiter.limit ( "100/minute" ) # Conservative
async def send_message ( request : Request):
response = await llm.ainvoke(request.query)
return { "response" : response.content}
## Per-user rate limiting
@limiter.limit ( "200/minute" , key_func = lambda : get_current_user().id)
async def send_message_authenticated ( user : User = Depends(get_current_user)):
pass
Error Handling
from openai import (
APIError,
APITimeoutError,
RateLimitError,
APIConnectionError
)
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
@retry (
stop = stop_after_attempt( 3 ),
wait = wait_exponential( multiplier = 1 , min = 2 , max = 10 ),
retry = retry_if_exception_type((RateLimitError, APITimeoutError, APIConnectionError))
)
async def call_openai_with_retry ( query : str ):
"""Call OpenAI with automatic retry on transient errors"""
try :
return await llm.ainvoke(query)
except RateLimitError as e:
logger.warning( f "Rate limit hit: { e } " )
raise # Will retry
except APITimeoutError as e:
logger.warning( f "Timeout: { e } " )
raise # Will retry
except APIConnectionError as e:
logger.warning( f "Connection error: { e } " )
raise # Will retry
except APIError as e:
logger.error( f "API error: { e } " )
if e.status_code and e.status_code >= 500 :
raise # Server error, retry
else :
return None # Client error, don't retry
## Use with fallback
async def call_with_fallback ( query : str ):
try :
return await call_openai_with_retry(query)
except Exception as e:
logger.error( f "OpenAI failed after retries: { e } " )
# Fallback to another provider
fallback_llm = LLMFactory( provider = "anthropic" , model_name = "claude-sonnet-4-5" )
return await fallback_llm.ainvoke(query)
Batching
## Process multiple requests efficiently
import asyncio
queries = [
"What is Python?" ,
"Explain async/await" ,
"What is FastAPI?"
]
## Concurrent processing
async def process_batch ( queries : List[ str ]):
tasks = [llm.ainvoke(q) for q in queries]
responses = await asyncio.gather( * tasks)
return responses
results = await process_batch(queries)
for query, response in zip (queries, results):
print ( f "Q: { query } \n A: { response.content } \n " )
Token Management
import tiktoken
## Count tokens (accurate for OpenAI)
def count_tokens ( text : str , model : str = "gpt-5.1" ) -> int :
"""Count tokens using tiktoken"""
encoding = tiktoken.encoding_for_model(model)
return len (encoding.encode(text))
## Truncate to fit context
def truncate_to_tokens ( text : str , max_tokens : int , model : str = "gpt-5.1" ) -> str :
"""Truncate text to fit token limit"""
encoding = tiktoken.encoding_for_model(model)
tokens = encoding.encode(text)
if len (tokens) <= max_tokens:
return text
# Truncate and decode
truncated_tokens = tokens[:max_tokens]
return encoding.decode(truncated_tokens)
## Use before sending
query = truncate_to_tokens(long_query, max_tokens = 120000 ) # Leave room for response
response = await llm.ainvoke(query)
Caching
from functools import lru_cache
## Cache responses for identical queries
@lru_cache ( maxsize = 1000 )
async def cached_llm_call ( query : str ) -> str :
response = await llm.ainvoke(query)
return response.content
## Or use Redis for distributed caching
import hashlib
import json
async def call_with_redis_cache ( query : str ):
# Generate cache key
cache_key = f "llm: { hashlib.sha256(query.encode()).hexdigest() } "
# Check cache
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
# Call LLM
response = await llm.ainvoke(query)
# Cache for 1 hour
await redis.setex(cache_key, 3600 , json.dumps(response.content))
return response.content
Cost Optimization
Model Selection
## Use cheaper model for simple tasks
def get_optimal_model ( complexity : str ) -> str :
if complexity == "simple" :
return "gpt-5.1-nano" # $0.10/$0.40 per 1M tokens (smallest, fastest)
elif complexity == "low" :
return "gpt-5-mini" # $0.15/$0.60 per 1M tokens (cost-effective)
elif complexity == "medium" :
return "gpt-5.1" # $2.50/$10.00 per 1M tokens (flagship)
else :
return "gpt-5.1-pro" # $10/$30 per 1M tokens (most capable)
llm = LLMFactory(
provider = "openai" ,
model_name = get_optimal_model(analyze_complexity(query))
)
Usage Tracking
from langchain.callbacks import get_openai_callback
## Track usage and costs
with get_openai_callback() as cb:
response = await llm.ainvoke(query)
print ( f "Tokens used: { cb.total_tokens } " )
print ( f "Prompt tokens: { cb.prompt_tokens } " )
print ( f "Completion tokens: { cb.completion_tokens } " )
print ( f "Total cost: $ { cb.total_cost } " )
## Store metrics
await store_usage_metrics({
"model" : "gpt-5.1" ,
"tokens" : cb.total_tokens,
"cost" : cb.total_cost,
"user_id" : user.id,
"timestamp" : datetime.utcnow()
})
Budget Limits
## Enforce per-user budgets
async def check_budget ( user_id : str , estimated_cost : float ):
"""Check if user has budget remaining"""
usage = await get_user_usage(user_id)
if usage.total_cost + estimated_cost > usage.budget_limit:
raise BudgetExceededError(
f "Budget exceeded: $ { usage.total_cost :.2f} / $ { usage.budget_limit :.2f} "
)
## Use before API call
@app.post ( "/message" )
async def send_message (
request : MessageRequest,
user : User = Depends(get_current_user)
):
# Estimate cost
input_tokens = count_tokens(request.query, model = "gpt-5.1" )
estimated_cost = (input_tokens / 1_000_000 ) * 2.50 # $2.50 per 1M input tokens
# Check budget
await check_budget(user.id, estimated_cost)
# Call LLM
response = await llm.ainvoke(request.query)
return { "response" : response.content}
Monitoring
LangSmith Integration
## Automatic tracking with LangSmith
import os
os.environ[ "LANGCHAIN_TRACING_V2" ] = "true"
os.environ[ "LANGCHAIN_PROJECT" ] = "langgraph-openai"
os.environ[ "LANGCHAIN_API_KEY" ] = settings.langsmith_api_key
## All LLM calls automatically tracked
response = await llm.ainvoke(query)
## View traces at https://smith.langchain.com
Custom Metrics
from prometheus_client import Counter, Histogram
## Define metrics
openai_requests = Counter(
'openai_requests_total' ,
'Total OpenAI API requests' ,
[ 'model' , 'status' ]
)
openai_latency = Histogram(
'openai_request_duration_seconds' ,
'OpenAI request latency' ,
[ 'model' ]
)
openai_tokens = Counter(
'openai_tokens_total' ,
'Total tokens used' ,
[ 'model' , 'type' ] # type: input or output
)
## Track metrics
import time
async def call_openai_with_metrics ( query : str ):
start_time = time.time()
try :
with get_openai_callback() as cb:
response = await llm.ainvoke(query)
# Record metrics
openai_requests.labels(
model = "gpt-5.1" ,
status = "success"
).inc()
openai_tokens.labels(
model = "gpt-5.1" ,
type = "input"
).inc(cb.prompt_tokens)
openai_tokens.labels(
model = "gpt-5.1" ,
type = "output"
).inc(cb.completion_tokens)
return response
except Exception as e:
openai_requests.labels(
model = "gpt-5.1" ,
status = "error"
).inc()
raise
finally :
duration = time.time() - start_time
openai_latency.labels( model = "gpt-5.1" ).observe(duration)
Troubleshooting
Error : 401 Incorrect API key providedSolutions :# Verify API key format
echo $OPENAI_API_KEY | head -c 10
# Should start with: sk-
# Test key
curl https://api.openai.com/v1/models \
-H "Authorization: Bearer $OPENAI_API_KEY "
# Regenerate key
# 1. Go to https://platform.openai.com/api-keys
# 2. Revoke old key
# 3. Create new key
# 4. Update in Infisical
Error : 429 Rate limit reachedSolutions :# Check your usage tier
# https://platform.openai.com/account/limits
# Implement exponential backoff
from tenacity import retry, wait_exponential
@retry ( wait = wait_exponential( multiplier = 1 , min = 2 , max = 60 ))
async def call_with_backoff ( query : str ):
return await llm.ainvoke(query)
# Reduce rate
@limiter.limit ( "100/minute" )
async def send_message ( request : Request):
pass
# Request tier upgrade
# https://platform.openai.com/account/billing/limits
Error : This model's maximum context length is 128000 tokensSolutions :# Truncate input
max_input_tokens = 120000 # Leave room for response
truncated = truncate_to_tokens(long_text, max_input_tokens, model = "gpt-5.1" )
# Summarize first
summary_llm = LLMFactory( provider = "openai" , model_name = "gpt-5-mini" )
summary = await summary_llm.ainvoke( f "Summarize: \n\n { long_text } " )
# Then use summary
response = await llm.ainvoke( f "Analyze this summary: \n\n { summary } " )
Error : APITimeoutError: Request timed outSolutions :# Increase timeout
llm = ChatOpenAI(
model = "gpt-5.1" ,
request_timeout = 120 , # 2 minutes
max_retries = 3
)
# Reduce max_tokens
llm = ChatOpenAI(
model = "gpt-5.1" ,
max_tokens = 2048 # Smaller responses
)
# Use streaming for long responses
async for chunk in llm.astream(query):
yield chunk
Best Practices
Never commit API keys to Git
Use Infisical or similar secret manager
Rotate API keys quarterly
Monitor for unusual usage patterns
Implement per-user rate limiting
Validate and sanitize all inputs
Track usage per user/team
Set budget alerts
Use cheaper models for simple tasks (gpt-5.1-nano, gpt-5-mini)
Implement token limits
Monitor and optimize prompt efficiency
Use gpt-5-mini or gpt-5.1-nano for development/testing
Implement exponential backoff
Handle rate limits gracefully
Add fallback to other providers
Log all errors with context
Monitor latency and error rates
Set up alerting for failures
Next Steps
OpenAI Ready : Leverage GPT-5 series models in your MCP Server!