title: "Large Language Models (LLMs) Integration: Building Applications with OpenAI, Claude, and Beyond" date: 2025-01-09 category: "AI & Machine Learning" tags: ["LLMs", "OpenAI", "Claude", "AI Integration", "API Development", "GPT", "Anthropic"] author: "Tech Writer" seo_description: "Master the art of integrating Large Language Models into your applications. Learn practical techniques for working with OpenAI, Claude, and other LLM APIs, including best practices for production deployments." seo_keywords: "LLM integration, OpenAI API, Claude API, GPT integration, language models, AI applications, LLM development, API best practices"
Six months ago, I was tasked with adding "AI features" to our SaaS product. The CEO had seen ChatGPT and wanted "that, but in our app." Sound familiar? What started as a vague request turned into a deep dive into the world of LLM integration, teaching me lessons that no documentation could have prepared me for.
Today, our application seamlessly leverages multiple LLMs for different tasks, handles millions of requests daily, and most importantly, provides genuine value to users. Here's everything I learned about integrating LLMs into production applications.
Table Of Contents
- title: "Large Language Models (LLMs) Integration: Building Applications with OpenAI, Claude, and Beyond"date: 2025-01-09category: "AI & Machine Learning"tags: ["LLMs", "OpenAI", "Claude", "AI Integration", "API Development", "GPT", "Anthropic"]author: "Tech Writer"seo_description: "Master the art of integrating Large Language Models into your applications. Learn practical techniques for working with OpenAI, Claude, and other LLM APIs, including best practices for production deployments."seo_keywords: "LLM integration, OpenAI API, Claude API, GPT integration, language models, AI applications, LLM development, API best practices"
- Understanding the LLM Landscape
- Getting Started: Your First LLM Integration
- Advanced Integration Patterns
- Production Considerations
- Best Practices and Lessons Learned
- The Future of LLM Integration
Understanding the LLM Landscape
The Major Players and Their Strengths
OpenAI (GPT-4, GPT-3.5)
- Strengths: General purpose, creative tasks, code generation
- Best for: Versatile applications, rapid prototyping
- Considerations: Cost at scale, rate limits
Anthropic (Claude)
- Strengths: Large context windows, nuanced reasoning, safety-focused
- Best for: Document analysis, complex conversations, ethical AI applications
- Considerations: Newer ecosystem, different prompt engineering approach
Google (PaLM, Gemini)
- Strengths: Multimodal capabilities, integration with Google services
- Best for: Applications needing image understanding, Google ecosystem integration
- Considerations: API stability, regional availability
Open Source (LLaMA, Mistral, etc.)
- Strengths: Self-hosting, customization, no API costs
- Best for: Privacy-sensitive applications, specialized fine-tuning
- Considerations: Infrastructure requirements, model quality variations
Getting Started: Your First LLM Integration
Let's build a practical example - a customer support assistant that can understand queries and provide helpful responses.
Setting Up Multiple LLM Providers
import os
from typing import Dict, List, Optional
from dataclasses import dataclass
import openai
from anthropic import Anthropic
import google.generativeai as genai
from tenacity import retry, stop_after_attempt, wait_exponential
@dataclass
class LLMResponse:
content: str
model: str
usage: Dict[str, int]
cost: float
latency: float
class LLMProvider:
def __init__(self):
self.providers = self._initialize_providers()
def _initialize_providers(self):
providers = {}
# OpenAI
if os.getenv('OPENAI_API_KEY'):
openai.api_key = os.getenv('OPENAI_API_KEY')
providers['openai'] = self._create_openai_client()
# Anthropic
if os.getenv('ANTHROPIC_API_KEY'):
providers['anthropic'] = Anthropic(
api_key=os.getenv('ANTHROPIC_API_KEY')
)
# Google
if os.getenv('GOOGLE_API_KEY'):
genai.configure(api_key=os.getenv('GOOGLE_API_KEY'))
providers['google'] = genai
return providers
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def complete(
self,
prompt: str,
provider: str = 'openai',
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> LLMResponse:
start_time = time.time()
if provider == 'openai':
response = await self._openai_complete(
prompt, model or 'gpt-4', temperature, max_tokens, **kwargs
)
elif provider == 'anthropic':
response = await self._anthropic_complete(
prompt, model or 'claude-3-opus-20240229', temperature, max_tokens, **kwargs
)
elif provider == 'google':
response = await self._google_complete(
prompt, model or 'gemini-pro', temperature, max_tokens, **kwargs
)
else:
raise ValueError(f"Unknown provider: {provider}")
latency = time.time() - start_time
return LLMResponse(
content=response['content'],
model=response['model'],
usage=response['usage'],
cost=self._calculate_cost(response['usage'], response['model']),
latency=latency
)
async def _openai_complete(self, prompt, model, temperature, max_tokens, **kwargs):
response = await openai.ChatCompletion.acreate(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
return {
'content': response.choices[0].message.content,
'model': model,
'usage': {
'prompt_tokens': response.usage.prompt_tokens,
'completion_tokens': response.usage.completion_tokens,
'total_tokens': response.usage.total_tokens
}
}
def _calculate_cost(self, usage: Dict[str, int], model: str) -> float:
# Pricing as of 2025 - always check current pricing
pricing = {
'gpt-4': {'prompt': 0.03, 'completion': 0.06},
'gpt-3.5-turbo': {'prompt': 0.001, 'completion': 0.002},
'claude-3-opus-20240229': {'prompt': 0.015, 'completion': 0.075},
'claude-3-sonnet-20240229': {'prompt': 0.003, 'completion': 0.015},
'gemini-pro': {'prompt': 0.001, 'completion': 0.002}
}
if model not in pricing:
return 0.0
prompt_cost = (usage['prompt_tokens'] / 1000) * pricing[model]['prompt']
completion_cost = (usage['completion_tokens'] / 1000) * pricing[model]['completion']
return prompt_cost + completion_cost
Building a Smart Router for Cost and Performance Optimization
Not all queries need GPT-4. Here's how to route intelligently:
class SmartLLMRouter:
def __init__(self, providers: LLMProvider):
self.providers = providers
self.routing_rules = self._initialize_routing_rules()
self.performance_history = defaultdict(list)
def _initialize_routing_rules(self):
return {
'simple_query': {
'providers': ['openai'],
'model': 'gpt-3.5-turbo',
'max_tokens': 150
},
'code_generation': {
'providers': ['openai', 'anthropic'],
'model': 'gpt-4',
'max_tokens': 2000
},
'document_analysis': {
'providers': ['anthropic'],
'model': 'claude-3-opus-20240229',
'max_tokens': 4000
},
'creative_writing': {
'providers': ['openai', 'anthropic'],
'model': 'gpt-4',
'temperature': 0.9,
'max_tokens': 1500
}
}
async def route_request(
self,
prompt: str,
task_type: Optional[str] = None,
requirements: Optional[Dict] = None
) -> LLMResponse:
# Classify task if not provided
if not task_type:
task_type = await self._classify_task(prompt)
# Get routing configuration
route_config = self.routing_rules.get(task_type, self.routing_rules['simple_query'])
# Apply any specific requirements
if requirements:
route_config.update(requirements)
# Try providers in order of preference
for provider in route_config['providers']:
try:
response = await self.providers.complete(
prompt=prompt,
provider=provider,
model=route_config.get('model'),
temperature=route_config.get('temperature', 0.7),
max_tokens=route_config.get('max_tokens', 1000)
)
# Track performance
self._track_performance(provider, task_type, response)
return response
except Exception as e:
logger.warning(f"Provider {provider} failed: {e}")
continue
raise Exception("All providers failed")
async def _classify_task(self, prompt: str) -> str:
classification_prompt = f"""
Classify this user request into one of these categories:
- simple_query: Basic questions, simple requests
- code_generation: Programming, code writing requests
- document_analysis: Long text analysis, summarization
- creative_writing: Stories, creative content
User request: {prompt[:200]}
Category:
"""
response = await self.providers.complete(
prompt=classification_prompt,
provider='openai',
model='gpt-3.5-turbo',
temperature=0,
max_tokens=10
)
return response.content.strip().lower()
Advanced Integration Patterns
Streaming Responses for Better UX
Users hate waiting. Here's how to stream responses:
class LLMStreamer {
constructor(apiKey) {
this.apiKey = apiKey;
this.eventSource = null;
}
async streamCompletion(prompt, onChunk, onComplete, onError) {
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`
},
body: JSON.stringify({
model: 'gpt-4',
messages: [{ role: 'user', content: prompt }],
stream: true
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
onComplete();
return;
}
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
onChunk(content);
}
} catch (e) {
console.error('Parse error:', e);
}
}
}
}
} catch (error) {
onError(error);
}
}
// React component example
StreamingChat() {
const [messages, setMessages] = useState([]);
const [currentResponse, setCurrentResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = async (userMessage) => {
setMessages(prev => [...prev, { role: 'user', content: userMessage }]);
setIsStreaming(true);
setCurrentResponse('');
await this.streamCompletion(
userMessage,
(chunk) => {
setCurrentResponse(prev => prev + chunk);
},
() => {
setMessages(prev => [...prev, {
role: 'assistant',
content: currentResponse
}]);
setCurrentResponse('');
setIsStreaming(false);
},
(error) => {
console.error('Streaming error:', error);
setIsStreaming(false);
}
);
};
return (
<div className="chat-container">
<div className="messages">
{messages.map((msg, idx) => (
<Message key={idx} {...msg} />
))}
{isStreaming && (
<div className="streaming-message">
<TypewriterEffect text={currentResponse} />
</div>
)}
</div>
<ChatInput onSend={sendMessage} disabled={isStreaming} />
</div>
);
}
}
Context Management for Long Conversations
LLMs have token limits. Here's how to manage context intelligently:
class ContextManager:
def __init__(self, max_tokens: int = 8000, model: str = 'gpt-4'):
self.max_tokens = max_tokens
self.model = model
self.token_buffer = 1000 # Reserve tokens for response
def manage_context(
self,
messages: List[Dict[str, str]],
system_prompt: Optional[str] = None
) -> List[Dict[str, str]]:
# Start with system prompt
managed_messages = []
total_tokens = 0
if system_prompt:
system_msg = {"role": "system", "content": system_prompt}
total_tokens += self._count_tokens(system_msg['content'])
managed_messages.append(system_msg)
# Always include the latest user message
if messages and messages[-1]['role'] == 'user':
latest_tokens = self._count_tokens(messages[-1]['content'])
if total_tokens + latest_tokens > self.max_tokens - self.token_buffer:
# Summarize if even the latest message is too long
messages[-1] = self._summarize_message(messages[-1])
latest_tokens = self._count_tokens(messages[-1]['content'])
total_tokens += latest_tokens
# Add historical messages in reverse order
available_tokens = self.max_tokens - self.token_buffer - total_tokens
historical_messages = []
for msg in reversed(messages[:-1]):
msg_tokens = self._count_tokens(msg['content'])
if total_tokens + msg_tokens > available_tokens:
# Try to summarize older messages
if len(historical_messages) > 4: # Keep recent context detailed
summarized = self._summarize_message(msg)
msg_tokens = self._count_tokens(summarized['content'])
if total_tokens + msg_tokens <= available_tokens:
historical_messages.insert(0, summarized)
total_tokens += msg_tokens
break
else:
historical_messages.insert(0, msg)
total_tokens += msg_tokens
# Combine all messages
managed_messages.extend(historical_messages)
if messages and messages[-1]['role'] == 'user':
managed_messages.append(messages[-1])
return managed_messages
def _count_tokens(self, text: str) -> int:
# Use tiktoken for accurate counting
import tiktoken
encoder = tiktoken.encoding_for_model(self.model)
return len(encoder.encode(text))
def _summarize_message(self, message: Dict[str, str]) -> Dict[str, str]:
# In production, you'd use an LLM to summarize
# This is a simple truncation for example
max_chars = 500
content = message['content']
if len(content) > max_chars:
content = content[:max_chars] + "... [truncated]"
return {
"role": message['role'],
"content": f"[Summarized] {content}"
}
Implementing Function Calling
Modern LLMs can call functions. Here's how to implement it safely:
class FunctionCallingHandler:
def __init__(self):
self.available_functions = {
'get_weather': self.get_weather,
'search_database': self.search_database,
'send_email': self.send_email,
'calculate': self.calculate
}
self.function_schemas = {
'get_weather': {
'name': 'get_weather',
'description': 'Get current weather for a location',
'parameters': {
'type': 'object',
'properties': {
'location': {
'type': 'string',
'description': 'City name or coordinates'
},
'units': {
'type': 'string',
'enum': ['celsius', 'fahrenheit'],
'description': 'Temperature units'
}
},
'required': ['location']
}
},
'search_database': {
'name': 'search_database',
'description': 'Search internal database for information',
'parameters': {
'type': 'object',
'properties': {
'query': {
'type': 'string',
'description': 'Search query'
},
'filters': {
'type': 'object',
'description': 'Optional filters'
}
},
'required': ['query']
}
}
}
async def handle_function_call(self, function_call):
function_name = function_call.get('name')
function_args = json.loads(function_call.get('arguments', '{}'))
# Security: Validate function exists and is allowed
if function_name not in self.available_functions:
raise ValueError(f"Function {function_name} not available")
# Security: Validate arguments
if not self._validate_arguments(function_name, function_args):
raise ValueError(f"Invalid arguments for {function_name}")
# Execute function with error handling
try:
result = await self.available_functions[function_name](**function_args)
return {
'role': 'function',
'name': function_name,
'content': json.dumps(result)
}
except Exception as e:
logger.error(f"Function {function_name} failed: {e}")
return {
'role': 'function',
'name': function_name,
'content': json.dumps({
'error': str(e),
'status': 'failed'
})
}
def _validate_arguments(self, function_name: str, args: dict) -> bool:
schema = self.function_schemas.get(function_name)
if not schema:
return False
# Validate required parameters
required = schema['parameters'].get('required', [])
for param in required:
if param not in args:
return False
# Validate parameter types
properties = schema['parameters'].get('properties', {})
for key, value in args.items():
if key not in properties:
return False # Unexpected parameter
expected_type = properties[key].get('type')
if not self._check_type(value, expected_type):
return False
return True
async def get_weather(self, location: str, units: str = 'celsius'):
# Simulated weather API call
return {
'location': location,
'temperature': 22 if units == 'celsius' else 72,
'conditions': 'Partly cloudy',
'humidity': 65
}
Production Considerations
Rate Limiting and Quota Management
class RateLimiter:
def __init__(self,
requests_per_minute: int = 60,
requests_per_day: int = 10000,
tokens_per_minute: int = 90000,
tokens_per_day: int = 2000000):
self.rpm_limit = requests_per_minute
self.rpd_limit = requests_per_day
self.tpm_limit = tokens_per_minute
self.tpd_limit = tokens_per_day
self.request_history = deque()
self.token_history = deque()
self.daily_requests = 0
self.daily_tokens = 0
self.last_reset = datetime.now()
async def check_limits(self, estimated_tokens: int) -> bool:
now = datetime.now()
# Reset daily counters
if (now - self.last_reset).days >= 1:
self.daily_requests = 0
self.daily_tokens = 0
self.last_reset = now
# Clean old history
minute_ago = now - timedelta(minutes=1)
self.request_history = deque(
r for r in self.request_history if r > minute_ago
)
self.token_history = deque(
(t, tokens) for t, tokens in self.token_history if t > minute_ago
)
# Check rate limits
if len(self.request_history) >= self.rpm_limit:
wait_time = (self.request_history[0] - minute_ago).total_seconds()
raise RateLimitError(f"Rate limit exceeded. Wait {wait_time:.1f}s")
minute_tokens = sum(tokens for _, tokens in self.token_history)
if minute_tokens + estimated_tokens > self.tpm_limit:
raise RateLimitError("Token rate limit exceeded")
# Check daily limits
if self.daily_requests >= self.rpd_limit:
raise RateLimitError("Daily request limit exceeded")
if self.daily_tokens + estimated_tokens > self.tpd_limit:
raise RateLimitError("Daily token limit exceeded")
return True
def record_usage(self, tokens_used: int):
now = datetime.now()
self.request_history.append(now)
self.token_history.append((now, tokens_used))
self.daily_requests += 1
self.daily_tokens += tokens_used
Caching for Cost Optimization
class LLMCache:
def __init__(self,
cache_ttl: int = 3600,
max_cache_size: int = 10000,
similarity_threshold: float = 0.95):
self.cache = {}
self.embeddings_cache = {}
self.cache_ttl = cache_ttl
self.max_cache_size = max_cache_size
self.similarity_threshold = similarity_threshold
self.embedding_model = self._load_embedding_model()
async def get_or_fetch(
self,
prompt: str,
fetch_func,
use_semantic_cache: bool = True
):
# Try exact match first
cache_key = self._get_cache_key(prompt)
if cache_key in self.cache:
entry = self.cache[cache_key]
if time.time() - entry['timestamp'] < self.cache_ttl:
return entry['response']
# Try semantic similarity if enabled
if use_semantic_cache:
similar_response = await self._find_similar_cached(prompt)
if similar_response:
return similar_response
# Fetch new response
response = await fetch_func(prompt)
# Cache the response
await self._cache_response(prompt, response)
return response
async def _find_similar_cached(self, prompt: str):
# Generate embedding for the prompt
prompt_embedding = await self._get_embedding(prompt)
best_match = None
best_similarity = 0
for cached_prompt, cached_data in self.cache.items():
if time.time() - cached_data['timestamp'] > self.cache_ttl:
continue
# Get cached embedding
cached_embedding = self.embeddings_cache.get(cached_prompt)
if not cached_embedding:
continue
# Calculate similarity
similarity = self._cosine_similarity(prompt_embedding, cached_embedding)
if similarity > best_similarity and similarity > self.similarity_threshold:
best_similarity = similarity
best_match = cached_data['response']
return best_match
async def _cache_response(self, prompt: str, response):
# Manage cache size
if len(self.cache) >= self.max_cache_size:
# Remove oldest entries
sorted_items = sorted(
self.cache.items(),
key=lambda x: x[1]['timestamp']
)
for key, _ in sorted_items[:len(self.cache) // 4]:
del self.cache[key]
if key in self.embeddings_cache:
del self.embeddings_cache[key]
# Cache new response
cache_key = self._get_cache_key(prompt)
self.cache[cache_key] = {
'response': response,
'timestamp': time.time(),
'access_count': 0
}
# Cache embedding for semantic search
embedding = await self._get_embedding(prompt)
self.embeddings_cache[cache_key] = embedding
Monitoring and Observability
class LLMObservability:
def __init__(self, service_name: str = "llm_service"):
self.service_name = service_name
self.metrics = self._initialize_metrics()
self.tracer = self._initialize_tracing()
def _initialize_metrics(self):
# Prometheus metrics
return {
'request_count': Counter(
'llm_requests_total',
'Total LLM requests',
['provider', 'model', 'status']
),
'request_duration': Histogram(
'llm_request_duration_seconds',
'LLM request duration',
['provider', 'model']
),
'tokens_used': Counter(
'llm_tokens_total',
'Total tokens used',
['provider', 'model', 'type']
),
'cost': Counter(
'llm_cost_dollars',
'Total cost in dollars',
['provider', 'model']
),
'cache_hits': Counter(
'llm_cache_hits_total',
'Cache hit count',
['cache_type']
)
}
@contextmanager
def trace_llm_call(self, provider: str, model: str, operation: str):
span = self.tracer.start_span(
f"llm.{operation}",
attributes={
'llm.provider': provider,
'llm.model': model,
'llm.operation': operation
}
)
start_time = time.time()
try:
yield span
# Record success metrics
self.metrics['request_count'].labels(
provider=provider,
model=model,
status='success'
).inc()
except Exception as e:
# Record error
span.set_status(Status(StatusCode.ERROR, str(e)))
self.metrics['request_count'].labels(
provider=provider,
model=model,
status='error'
).inc()
raise
finally:
duration = time.time() - start_time
self.metrics['request_duration'].labels(
provider=provider,
model=model
).observe(duration)
span.end()
def record_usage(self, provider: str, model: str, usage: dict, cost: float):
# Record token usage
self.metrics['tokens_used'].labels(
provider=provider,
model=model,
type='prompt'
).inc(usage.get('prompt_tokens', 0))
self.metrics['tokens_used'].labels(
provider=provider,
model=model,
type='completion'
).inc(usage.get('completion_tokens', 0))
# Record cost
self.metrics['cost'].labels(
provider=provider,
model=model
).inc(cost)
Best Practices and Lessons Learned
1. Always Have a Fallback
class ResilientLLMClient:
def __init__(self):
self.primary_provider = 'openai'
self.fallback_providers = ['anthropic', 'google']
self.local_model = self._load_local_model() # Last resort
async def get_completion(self, prompt: str, **kwargs):
# Try primary provider
try:
return await self._try_provider(self.primary_provider, prompt, **kwargs)
except Exception as e:
logger.warning(f"Primary provider failed: {e}")
# Try fallbacks
for provider in self.fallback_providers:
try:
return await self._try_provider(provider, prompt, **kwargs)
except Exception as e:
logger.warning(f"Fallback {provider} failed: {e}")
# Last resort: local model
logger.warning("All API providers failed, using local model")
return self._local_inference(prompt)
2. Version Your Prompts
class PromptVersioning:
def __init__(self):
self.prompts = {
'customer_support_v1': {
'template': "You are a helpful customer support agent...",
'deprecated': True
},
'customer_support_v2': {
'template': "You are an experienced customer support specialist...",
'active': True,
'tested': True,
'metrics': {
'satisfaction_rate': 0.92,
'resolution_rate': 0.87
}
}
}
def get_prompt(self, prompt_id: str, version: Optional[str] = None):
if version:
full_id = f"{prompt_id}_{version}"
else:
# Get latest active version
full_id = self._get_latest_version(prompt_id)
prompt_data = self.prompts.get(full_id)
if not prompt_data or prompt_data.get('deprecated'):
raise ValueError(f"Prompt {full_id} not available")
return prompt_data['template']
3. Implement Safety Filters
class SafetyFilter:
def __init__(self):
self.blocked_patterns = [
r'(?i)password|secret|api[_-]?key',
r'(?i)hack|exploit|vulnerability',
r'(?i)personal[_-]?information|ssn|credit[_-]?card'
]
def check_input(self, text: str) -> bool:
# Check for PII
if self._contains_pii(text):
raise SafetyViolation("Input contains potential PII")
# Check for sensitive patterns
for pattern in self.blocked_patterns:
if re.search(pattern, text):
raise SafetyViolation(f"Input matches blocked pattern")
return True
def sanitize_output(self, text: str) -> str:
# Remove any accidentally generated sensitive info
sanitized = text
# Redact email addresses
sanitized = re.sub(r'[\w\.-]+@[\w\.-]+\.\w+', '[EMAIL REDACTED]', sanitized)
# Redact phone numbers
sanitized = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE REDACTED]', sanitized)
return sanitized
The Future of LLM Integration
As we look ahead, several trends are emerging:
- Multi-Modal Everything: Text, images, audio, and video in seamless interactions
- Edge LLMs: Models running directly on user devices for privacy and speed
- Specialized Models: Industry-specific LLMs trained on domain knowledge
- Agent Ecosystems: LLMs that can use tools and collaborate with other agents
The key to success isn't just knowing how to call an API - it's understanding how to build systems that leverage LLMs intelligently, safely, and cost-effectively. Start simple, measure everything, and always keep the user experience at the center of your design decisions.
Remember: LLMs are powerful tools, but they're still just tools. The magic happens when you combine them with thoughtful engineering, domain expertise, and a deep understanding of your users' needs.
Add Comment
No comments yet. Be the first to comment!