API Authentication & Security: From Open Doors to Fort Knox
It was my second month at Amazon, and I was tasked with adding "simple authentication" to our internal API. How hard could it be? I added a basic API key check and called it done. Three days later, our security team found that anyone could generate valid API keys just by incrementing the last digit. That "simple" fix led to a complete security audit and a crash course in API security that changed how I think about authentication forever.
Since then, I've designed authentication systems for millions of users, implemented OAuth flows for enterprise clients, and learned that API security isn't just about keeping bad actors out – it's about building trust with every single request.
Table Of Contents
- The Great API Key Disaster of 2022
- OAuth 2.0 Implementation
- Security Best Practices
- Production Security Checklist
- Final Thoughts: Security as a Mindset
The Great API Key Disaster of 2022
Here's the "brilliant" authentication system that got me in trouble:
# My original "secure" API - spoiler alert: it wasn't
import random
def generate_api_key():
# Generate a "random" API key
return f"api_key_{random.randint(1000, 9999)}"
def validate_api_key(api_key):
# Check if API key follows our format
if api_key.startswith("api_key_") and len(api_key) == 12:
key_number = int(api_key.split("_")[2])
if 1000 <= key_number <= 9999:
return True
return False
@app.route('/api/users')
def get_users():
api_key = request.headers.get('Authorization')
if not validate_api_key(api_key):
return {"error": "Invalid API key"}, 401
# Return sensitive user data - YIKES!
return {"users": [{"id": 1, "email": "maya@company.com", "ssn": "123-45-6789"}]}
# The security team's reaction when they found this:
# for i in range(1000, 10000):
# response = requests.get('/api/users', headers={'Authorization': f'api_key_{i}'})
# if response.status_code == 200:
# print(f"FOUND VALID KEY: api_key_{i}")
# break
Needless to say, this approach had some... issues. Here's how I rebuilt it properly:
# Secure API authentication system
import jwt
import bcrypt
import secrets
import hashlib
import time
from datetime import datetime, timedelta
from functools import wraps
import redis
import logging
class APIKeyManager:
def __init__(self):
self.redis_client = redis.Redis()
self.logger = logging.getLogger(__name__)
def generate_api_key(self, user_id: str, description: str = "") -> dict:
"""Generate a cryptographically secure API key"""
# Generate random key
key_bytes = secrets.token_bytes(32)
api_key = secrets.token_urlsafe(32)
# Create key hash for storage (never store raw keys!)
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# Store key metadata
key_data = {
'user_id': user_id,
'key_hash': key_hash,
'description': description,
'created_at': datetime.utcnow().isoformat(),
'last_used': None,
'usage_count': 0,
'is_active': True
}
# Store in database
key_id = self._store_api_key(key_data)
# Cache for fast lookup
self.redis_client.setex(
f"api_key:{key_hash}",
86400, # 24 hours cache
json.dumps(key_data)
)
self.logger.info(f"API key generated for user {user_id}")
return {
'key_id': key_id,
'api_key': f"ask_{api_key}", # Prefix for identification
'created_at': key_data['created_at']
}
def validate_api_key(self, api_key: str) -> dict:
"""Validate API key and return user data"""
if not api_key or not api_key.startswith('ask_'):
return None
# Extract actual key
raw_key = api_key[4:] # Remove 'ask_' prefix
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
# Check cache first
cached_data = self.redis_client.get(f"api_key:{key_hash}")
if cached_data:
key_data = json.loads(cached_data)
else:
# Check database
key_data = self._get_api_key_data(key_hash)
if key_data:
# Cache for future requests
self.redis_client.setex(
f"api_key:{key_hash}",
86400,
json.dumps(key_data)
)
if not key_data or not key_data.get('is_active'):
return None
# Update usage stats
self._update_key_usage(key_hash)
return key_data
def revoke_api_key(self, key_id: str) -> bool:
"""Revoke an API key"""
success = self._deactivate_api_key(key_id)
if success:
# Clear from cache
key_data = self._get_api_key_by_id(key_id)
if key_data:
self.redis_client.delete(f"api_key:{key_data['key_hash']}")
return success
# JWT Token Manager
class JWTManager:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.algorithm = 'HS256'
self.access_token_expires = timedelta(minutes=15)
self.refresh_token_expires = timedelta(days=7)
self.redis_client = redis.Redis()
def generate_tokens(self, user_id: str, user_role: str = 'user') -> dict:
"""Generate access and refresh tokens"""
now = datetime.utcnow()
# Access token (short-lived)
access_payload = {
'user_id': user_id,
'role': user_role,
'type': 'access',
'iat': now,
'exp': now + self.access_token_expires,
'jti': secrets.token_urlsafe(16) # JWT ID for tracking
}
# Refresh token (long-lived)
refresh_payload = {
'user_id': user_id,
'type': 'refresh',
'iat': now,
'exp': now + self.refresh_token_expires,
'jti': secrets.token_urlsafe(16)
}
access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)
# Store refresh token for validation
self.redis_client.setex(
f"refresh_token:{refresh_payload['jti']}",
int(self.refresh_token_expires.total_seconds()),
user_id
)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
'expires_in': int(self.access_token_expires.total_seconds())
}
def validate_access_token(self, token: str) -> dict:
"""Validate and decode access token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# Check token type
if payload.get('type') != 'access':
return None
# Check if token is blacklisted
if self.redis_client.get(f"blacklist:{payload['jti']}"):
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def refresh_access_token(self, refresh_token: str) -> dict:
"""Generate new access token using refresh token"""
try:
payload = jwt.decode(refresh_token, self.secret_key, algorithms=[self.algorithm])
if payload.get('type') != 'refresh':
return None
# Check if refresh token exists in Redis
user_id = self.redis_client.get(f"refresh_token:{payload['jti']}")
if not user_id:
return None
user_id = user_id.decode('utf-8')
# Generate new access token
return self.generate_tokens(user_id, 'user') # You'd get role from DB
except jwt.InvalidTokenError:
return None
def revoke_token(self, token: str) -> bool:
"""Add token to blacklist"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# Add to blacklist until expiration
ttl = payload['exp'] - time.time()
if ttl > 0:
self.redis_client.setex(
f"blacklist:{payload['jti']}",
int(ttl),
'revoked'
)
return True
except jwt.InvalidTokenError:
return False
# Rate Limiting
class RateLimiter:
def __init__(self):
self.redis_client = redis.Redis()
def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
"""
Check if request is within rate limit
Args:
identifier: User ID, IP address, or API key
limit: Maximum requests allowed
window: Time window in seconds
Returns:
dict: Rate limit status and metadata
"""
current_time = int(time.time())
window_start = current_time - window
# Sliding window log approach
key = f"rate_limit:{identifier}"
# Remove old entries
self.redis_client.zremrangebyscore(key, 0, window_start)
# Get current request count
current_requests = self.redis_client.zcard(key)
if current_requests >= limit:
# Rate limit exceeded
oldest_request = self.redis_client.zrange(key, 0, 0, withscores=True)
if oldest_request:
reset_time = int(oldest_request[0][1]) + window
else:
reset_time = current_time + window
return {
'allowed': False,
'limit': limit,
'remaining': 0,
'reset_time': reset_time,
'retry_after': reset_time - current_time
}
# Add current request
self.redis_client.zadd(key, {str(current_time): current_time})
self.redis_client.expire(key, window)
return {
'allowed': True,
'limit': limit,
'remaining': limit - current_requests - 1,
'reset_time': current_time + window,
'retry_after': 0
}
# Authentication Decorators
def require_api_key(f):
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key:
return {'error': 'API key required'}, 401
key_data = api_key_manager.validate_api_key(api_key)
if not key_data:
return {'error': 'Invalid API key'}, 401
# Add user context to request
request.user_id = key_data['user_id']
request.auth_method = 'api_key'
return f(*args, **kwargs)
return decorated_function
def require_jwt_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return {'error': 'Bearer token required'}, 401
token = auth_header.split(' ')[1]
payload = jwt_manager.validate_access_token(token)
if not payload:
return {'error': 'Invalid or expired token'}, 401
# Add user context to request
request.user_id = payload['user_id']
request.user_role = payload['role']
request.auth_method = 'jwt'
return f(*args, **kwargs)
return decorated_function
def rate_limit(limit: int, window: int = 3600):
"""Rate limiting decorator"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Use user ID if authenticated, otherwise IP
if hasattr(request, 'user_id'):
identifier = f"user:{request.user_id}"
else:
identifier = f"ip:{request.remote_addr}"
rate_limit_result = rate_limiter.check_rate_limit(identifier, limit, window)
if not rate_limit_result['allowed']:
response = {
'error': 'Rate limit exceeded',
'retry_after': rate_limit_result['retry_after']
}
headers = {
'X-RateLimit-Limit': str(limit),
'X-RateLimit-Remaining': '0',
'X-RateLimit-Reset': str(rate_limit_result['reset_time']),
'Retry-After': str(rate_limit_result['retry_after'])
}
return response, 429, headers
# Add rate limit headers to response
response = f(*args, **kwargs)
if isinstance(response, tuple):
data, status_code = response[:2]
headers = response[2] if len(response) > 2 else {}
else:
data, status_code, headers = response, 200, {}
headers.update({
'X-RateLimit-Limit': str(limit),
'X-RateLimit-Remaining': str(rate_limit_result['remaining']),
'X-RateLimit-Reset': str(rate_limit_result['reset_time'])
})
return data, status_code, headers
return decorated_function
return decorator
# Initialize managers
api_key_manager = APIKeyManager()
jwt_manager = JWTManager(os.environ.get('JWT_SECRET', 'your-secret-key'))
rate_limiter = RateLimiter()
OAuth 2.0 Implementation
# OAuth 2.0 Authorization Server
import base64
import urllib.parse
from typing import Optional
class OAuthAuthorizationServer:
def __init__(self):
self.redis_client = redis.Redis()
self.clients = {} # In production, load from database
self.logger = logging.getLogger(__name__)
def register_client(self, client_name: str, redirect_uris: list) -> dict:
"""Register OAuth client application"""
client_id = secrets.token_urlsafe(16)
client_secret = secrets.token_urlsafe(32)
client_data = {
'client_id': client_id,
'client_secret': client_secret,
'name': client_name,
'redirect_uris': redirect_uris,
'created_at': datetime.utcnow().isoformat()
}
# Store client data (in production, use database)
self.clients[client_id] = client_data
return {
'client_id': client_id,
'client_secret': client_secret
}
def generate_authorization_code(self, client_id: str, user_id: str,
redirect_uri: str, scope: str) -> str:
"""Generate authorization code for OAuth flow"""
if client_id not in self.clients:
raise ValueError("Invalid client_id")
client = self.clients[client_id]
if redirect_uri not in client['redirect_uris']:
raise ValueError("Invalid redirect_uri")
# Generate authorization code
auth_code = secrets.token_urlsafe(32)
# Store authorization code data
code_data = {
'client_id': client_id,
'user_id': user_id,
'redirect_uri': redirect_uri,
'scope': scope,
'created_at': time.time()
}
# Authorization codes expire in 10 minutes
self.redis_client.setex(
f"auth_code:{auth_code}",
600,
json.dumps(code_data)
)
return auth_code
def exchange_code_for_token(self, client_id: str, client_secret: str,
auth_code: str, redirect_uri: str) -> dict:
"""Exchange authorization code for access token"""
# Validate client credentials
if client_id not in self.clients:
raise ValueError("Invalid client")
client = self.clients[client_id]
if client['client_secret'] != client_secret:
raise ValueError("Invalid client credentials")
# Get and validate authorization code
code_data_json = self.redis_client.get(f"auth_code:{auth_code}")
if not code_data_json:
raise ValueError("Invalid or expired authorization code")
code_data = json.loads(code_data_json)
# Validate code belongs to this client and redirect URI
if (code_data['client_id'] != client_id or
code_data['redirect_uri'] != redirect_uri):
raise ValueError("Authorization code mismatch")
# Delete used authorization code
self.redis_client.delete(f"auth_code:{auth_code}")
# Generate access token
access_token = secrets.token_urlsafe(32)
refresh_token = secrets.token_urlsafe(32)
token_data = {
'access_token': access_token,
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': refresh_token,
'scope': code_data['scope'],
'user_id': code_data['user_id'],
'client_id': client_id
}
# Store access token
self.redis_client.setex(
f"access_token:{access_token}",
3600,
json.dumps(token_data)
)
# Store refresh token (longer expiry)
self.redis_client.setex(
f"refresh_token:{refresh_token}",
86400 * 30, # 30 days
json.dumps(token_data)
)
self.logger.info(f"Access token issued for user {code_data['user_id']}")
return {
'access_token': access_token,
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': refresh_token,
'scope': code_data['scope']
}
def validate_access_token(self, access_token: str) -> Optional[dict]:
"""Validate OAuth access token"""
token_data_json = self.redis_client.get(f"access_token:{access_token}")
if not token_data_json:
return None
return json.loads(token_data_json)
# OAuth Client Implementation
class OAuthClient:
def __init__(self, client_id: str, client_secret: str,
authorization_url: str, token_url: str):
self.client_id = client_id
self.client_secret = client_secret
self.authorization_url = authorization_url
self.token_url = token_url
def get_authorization_url(self, redirect_uri: str, scope: str, state: str = None) -> str:
"""Generate authorization URL for user to visit"""
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'scope': scope
}
if state:
params['state'] = state
return f"{self.authorization_url}?{urllib.parse.urlencode(params)}"
def exchange_code_for_token(self, code: str, redirect_uri: str) -> dict:
"""Exchange authorization code for access token"""
auth_header = base64.b64encode(
f"{self.client_id}:{self.client_secret}".encode()
).decode()
headers = {
'Authorization': f'Basic {auth_header}',
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri
}
response = requests.post(self.token_url, headers=headers, data=data)
response.raise_for_status()
return response.json()
# OAuth Routes
@app.route('/oauth/authorize')
def oauth_authorize():
"""OAuth authorization endpoint"""
client_id = request.args.get('client_id')
redirect_uri = request.args.get('redirect_uri')
scope = request.args.get('scope', 'read')
state = request.args.get('state')
# Validate client
if client_id not in oauth_server.clients:
return {'error': 'invalid_client'}, 400
# In a real app, you'd show a consent screen here
# For demo, we'll auto-approve
# Check if user is logged in
user_id = session.get('user_id')
if not user_id:
# Redirect to login with return URL
login_url = f"/login?return_to={urllib.parse.quote(request.url)}"
return redirect(login_url)
# Generate authorization code
try:
auth_code = oauth_server.generate_authorization_code(
client_id, user_id, redirect_uri, scope
)
# Redirect back to client with code
params = {'code': auth_code}
if state:
params['state'] = state
redirect_url = f"{redirect_uri}?{urllib.parse.urlencode(params)}"
return redirect(redirect_url)
except ValueError as e:
return {'error': str(e)}, 400
@app.route('/oauth/token', methods=['POST'])
def oauth_token():
"""OAuth token endpoint"""
# Get client credentials from Authorization header or form data
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Basic '):
# Decode basic auth
credentials = base64.b64decode(auth_header[6:]).decode()
client_id, client_secret = credentials.split(':', 1)
else:
client_id = request.form.get('client_id')
client_secret = request.form.get('client_secret')
grant_type = request.form.get('grant_type')
if grant_type == 'authorization_code':
code = request.form.get('code')
redirect_uri = request.form.get('redirect_uri')
try:
token_data = oauth_server.exchange_code_for_token(
client_id, client_secret, code, redirect_uri
)
return token_data
except ValueError as e:
return {'error': 'invalid_grant', 'error_description': str(e)}, 400
return {'error': 'unsupported_grant_type'}, 400
Security Best Practices
CORS (Cross-Origin Resource Sharing)
from flask_cors import CORS
# Secure CORS configuration
def configure_cors(app):
"""Configure CORS with security best practices"""
# Don't use CORS(app) - too permissive!
# Specific origins only
allowed_origins = [
'https://myapp.com',
'https://api.myapp.com',
'https://admin.myapp.com'
]
# Development origins (only in development)
if app.config.get('DEBUG'):
allowed_origins.extend([
'http://localhost:3000',
'http://localhost:8080',
'http://127.0.0.1:3000'
])
CORS(app,
origins=allowed_origins,
methods=['GET', 'POST', 'PUT', 'DELETE'],
allow_headers=['Content-Type', 'Authorization', 'X-API-Key'],
expose_headers=['X-RateLimit-Limit', 'X-RateLimit-Remaining'],
supports_credentials=True,
max_age=3600)
# Custom CORS headers for specific routes
@app.route('/api/public-data')
def public_data():
"""Public endpoint with relaxed CORS"""
response = make_response({'data': 'public'})
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET'
return response
@app.route('/api/sensitive-data')
@require_jwt_token
def sensitive_data():
"""Sensitive endpoint with strict CORS"""
response = make_response({'data': 'sensitive'})
response.headers['Access-Control-Allow-Origin'] = 'https://myapp.com'
response.headers['Access-Control-Allow-Credentials'] = 'true'
return response
Input Validation and Sanitization
from marshmallow import Schema, fields, validate, ValidationError
import bleach
class UserRegistrationSchema(Schema):
"""Schema for user registration validation"""
email = fields.Email(required=True, validate=validate.Length(max=255))
password = fields.Str(required=True, validate=validate.Length(min=8, max=128))
first_name = fields.Str(required=True, validate=validate.Length(min=1, max=50))
last_name = fields.Str(required=True, validate=validate.Length(min=1, max=50))
age = fields.Int(validate=validate.Range(min=13, max=120))
website = fields.Url(missing=None)
class APIInputValidator:
def __init__(self):
self.logger = logging.getLogger(__name__)
def validate_json_input(self, schema: Schema, data: dict) -> dict:
"""Validate and sanitize JSON input"""
try:
# Validate against schema
validated_data = schema.load(data)
# Sanitize string fields
for key, value in validated_data.items():
if isinstance(value, str):
# Remove potentially dangerous HTML/script tags
validated_data[key] = bleach.clean(
value,
tags=[], # No HTML tags allowed
attributes={},
strip=True
).strip()
return validated_data
except ValidationError as e:
self.logger.warning(f"Validation error: {e.messages}")
raise ValueError(f"Invalid input: {e.messages}")
def validate_query_params(self, allowed_params: dict) -> dict:
"""Validate query parameters"""
validated_params = {}
for param, validator in allowed_params.items():
value = request.args.get(param)
if value is not None:
try:
# Apply validator function
validated_params[param] = validator(value)
except ValueError as e:
raise ValueError(f"Invalid parameter '{param}': {str(e)}")
return validated_params
def validate_input(schema: Schema):
"""Decorator for input validation"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
validator = APIInputValidator()
try:
# Validate JSON body
if request.is_json:
validated_data = validator.validate_json_input(schema, request.json)
request.validated_data = validated_data
return f(*args, **kwargs)
except ValueError as e:
return {'error': str(e)}, 400
return decorated_function
return decorator
# Usage
@app.route('/api/users', methods=['POST'])
@validate_input(UserRegistrationSchema())
@rate_limit(10, 3600) # 10 registrations per hour
def create_user():
"""Create new user with validation"""
user_data = request.validated_data
# Additional business logic validation
if user_repository.email_exists(user_data['email']):
return {'error': 'Email already registered'}, 409
# Create user
user_id = user_service.create_user(user_data)
return {'user_id': user_id, 'message': 'User created successfully'}, 201
API Security Headers
@app.after_request
def add_security_headers(response):
"""Add security headers to all responses"""
# Prevent XSS attacks
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
# HTTPS enforcement
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
# Content Security Policy
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"connect-src 'self'"
)
# Referrer Policy
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Permissions Policy
response.headers['Permissions-Policy'] = (
"camera=(), microphone=(), geolocation=()"
)
return response
# API-specific security middleware
class APISecurityMiddleware:
def __init__(self, app):
self.app = app
self.logger = logging.getLogger(__name__)
def __call__(self, environ, start_response):
# Log all API requests
if environ['PATH_INFO'].startswith('/api/'):
self.logger.info(
f"API Request: {environ['REQUEST_METHOD']} {environ['PATH_INFO']} "
f"from {environ.get('REMOTE_ADDR')}"
)
# Check for suspicious patterns
self._check_suspicious_activity(environ)
return self.app(environ, start_response)
def _check_suspicious_activity(self, environ):
"""Check for suspicious request patterns"""
user_agent = environ.get('HTTP_USER_AGENT', '')
path = environ.get('PATH_INFO', '')
# Check for common attack patterns
suspicious_patterns = [
'../', # Path traversal
'<script', # XSS attempts
'union select', # SQL injection
'eval(', # Code injection
]
for pattern in suspicious_patterns:
if pattern.lower() in path.lower() or pattern.lower() in user_agent.lower():
self.logger.warning(
f"Suspicious request detected: {pattern} in "
f"{environ['REQUEST_METHOD']} {path} from {environ.get('REMOTE_ADDR')}"
)
break
# Apply middleware
app.wsgi_app = APISecurityMiddleware(app.wsgi_app)
Production Security Checklist
# Security configuration for production
class SecurityConfig:
"""Production security configuration"""
# Authentication
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') # Must be set
JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=7)
# Rate limiting
RATE_LIMIT_STORAGE_URL = os.environ.get('REDIS_URL')
DEFAULT_RATE_LIMIT = "1000 per hour"
# CORS
CORS_ORIGINS = os.environ.get('CORS_ORIGINS', '').split(',')
# HTTPS
SSL_REDIRECT = True
SSL_CERT_PATH = os.environ.get('SSL_CERT_PATH')
SSL_KEY_PATH = os.environ.get('SSL_KEY_PATH')
# Logging
LOG_LEVEL = 'INFO'
SECURITY_LOG_FILE = '/var/log/api_security.log'
@classmethod
def validate_config(cls):
"""Validate required security configuration"""
required_vars = [
'JWT_SECRET_KEY',
'REDIS_URL',
'DATABASE_URL'
]
missing_vars = []
for var in required_vars:
if not os.environ.get(var):
missing_vars.append(var)
if missing_vars:
raise RuntimeError(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
def create_secure_app():
"""Create Flask app with security configurations"""
app = Flask(__name__)
# Validate security configuration
SecurityConfig.validate_config()
# Configure JWT
jwt_manager = JWTManager(app)
jwt_manager.secret_key = SecurityConfig.JWT_SECRET_KEY
# Configure CORS
configure_cors(app)
# Configure rate limiting
limiter = Limiter(
app,
key_func=get_remote_address,
storage_uri=SecurityConfig.RATE_LIMIT_STORAGE_URL
)
# Configure logging
configure_security_logging(app)
return app
def configure_security_logging(app):
"""Configure security-focused logging"""
security_logger = logging.getLogger('security')
security_logger.setLevel(logging.INFO)
# File handler for security events
file_handler = logging.FileHandler(SecurityConfig.SECURITY_LOG_FILE)
file_handler.setLevel(logging.INFO)
# JSON formatter for log aggregation
formatter = logging.Formatter(
'{"timestamp": "%(asctime)s", "level": "%(levelname)s", '
'"logger": "%(name)s", "message": "%(message)s", "module": "%(module)s"}'
)
file_handler.setFormatter(formatter)
security_logger.addHandler(file_handler)
return security_logger
# Security monitoring
class SecurityMonitor:
def __init__(self):
self.redis_client = redis.Redis()
self.logger = logging.getLogger('security')
def log_failed_auth(self, identifier: str, reason: str):
"""Track failed authentication attempts"""
key = f"failed_auth:{identifier}"
current_count = self.redis_client.incr(key)
self.redis_client.expire(key, 3600) # 1 hour window
self.logger.warning(
f"Failed authentication: {identifier} - {reason} "
f"(count: {current_count})"
)
# Alert if too many failures
if current_count >= 5:
self.alert_security_team(identifier, current_count)
def alert_security_team(self, identifier: str, count: int):
"""Alert security team of suspicious activity"""
alert_data = {
'type': 'multiple_failed_auth',
'identifier': identifier,
'count': count,
'timestamp': datetime.utcnow().isoformat()
}
# In production, send to alerting system (PagerDuty, Slack, etc.)
self.logger.critical(f"SECURITY ALERT: {alert_data}")
security_monitor = SecurityMonitor()
Final Thoughts: Security as a Mindset
That embarrassing API key disaster taught me that security isn't a feature you add at the end – it's a mindset you adopt from the first line of code. Every endpoint, every parameter, every response is a potential attack vector that needs careful consideration.
The security measures I've shared aren't academic exercises – they're battle-tested defenses against real attacks I've seen in production. From brute force attempts to sophisticated OAuth exploits, understanding these patterns helps you build systems that protect both your users and your business.
Start with the basics: strong authentication, proper input validation, and rate limiting. Then layer on advanced techniques like OAuth flows, comprehensive logging, and security monitoring. Remember: security is a process, not a product.
The goal isn't to build an impenetrable fortress (impossible), but to make your system expensive enough to attack that adversaries move on to easier targets. Every security measure you implement raises the bar for potential attackers.
Currently writing this from Analog Coffee in Capitol Hill, where I'm implementing OAuth flows while enjoying my usual cortado. Share your API security war stories @maya_codes_pnw - we've all learned the hard way! 🔐☕
Add Comment
No comments yet. Be the first to comment!