Navigation

Backend

API Authentication Security 2025 Complete Guide

#api #Backend
Master API security from a developer who built auth systems at Amazon and Microsoft, covering JWT, OAuth, rate limiting, and security best practices with real implementations.

API Authentication & Security: From Open Doors to Fort Knox

It was my second month at Amazon, and I was tasked with adding "simple authentication" to our internal API. How hard could it be? I added a basic API key check and called it done. Three days later, our security team found that anyone could generate valid API keys just by incrementing the last digit. That "simple" fix led to a complete security audit and a crash course in API security that changed how I think about authentication forever.

Since then, I've designed authentication systems for millions of users, implemented OAuth flows for enterprise clients, and learned that API security isn't just about keeping bad actors out – it's about building trust with every single request.

Table Of Contents

The Great API Key Disaster of 2022

Here's the "brilliant" authentication system that got me in trouble:

# My original "secure" API - spoiler alert: it wasn't
import random

def generate_api_key():
    # Generate a "random" API key
    return f"api_key_{random.randint(1000, 9999)}"

def validate_api_key(api_key):
    # Check if API key follows our format
    if api_key.startswith("api_key_") and len(api_key) == 12:
        key_number = int(api_key.split("_")[2])
        if 1000 <= key_number <= 9999:
            return True
    return False

@app.route('/api/users')
def get_users():
    api_key = request.headers.get('Authorization')
    
    if not validate_api_key(api_key):
        return {"error": "Invalid API key"}, 401
    
    # Return sensitive user data - YIKES!
    return {"users": [{"id": 1, "email": "maya@company.com", "ssn": "123-45-6789"}]}

# The security team's reaction when they found this:
# for i in range(1000, 10000):
#     response = requests.get('/api/users', headers={'Authorization': f'api_key_{i}'})
#     if response.status_code == 200:
#         print(f"FOUND VALID KEY: api_key_{i}")
#         break

Needless to say, this approach had some... issues. Here's how I rebuilt it properly:

# Secure API authentication system
import jwt
import bcrypt
import secrets
import hashlib
import time
from datetime import datetime, timedelta
from functools import wraps
import redis
import logging

class APIKeyManager:
    def __init__(self):
        self.redis_client = redis.Redis()
        self.logger = logging.getLogger(__name__)
    
    def generate_api_key(self, user_id: str, description: str = "") -> dict:
        """Generate a cryptographically secure API key"""
        # Generate random key
        key_bytes = secrets.token_bytes(32)
        api_key = secrets.token_urlsafe(32)
        
        # Create key hash for storage (never store raw keys!)
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        
        # Store key metadata
        key_data = {
            'user_id': user_id,
            'key_hash': key_hash,
            'description': description,
            'created_at': datetime.utcnow().isoformat(),
            'last_used': None,
            'usage_count': 0,
            'is_active': True
        }
        
        # Store in database
        key_id = self._store_api_key(key_data)
        
        # Cache for fast lookup
        self.redis_client.setex(
            f"api_key:{key_hash}",
            86400,  # 24 hours cache
            json.dumps(key_data)
        )
        
        self.logger.info(f"API key generated for user {user_id}")
        
        return {
            'key_id': key_id,
            'api_key': f"ask_{api_key}",  # Prefix for identification
            'created_at': key_data['created_at']
        }
    
    def validate_api_key(self, api_key: str) -> dict:
        """Validate API key and return user data"""
        if not api_key or not api_key.startswith('ask_'):
            return None
        
        # Extract actual key
        raw_key = api_key[4:]  # Remove 'ask_' prefix
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
        
        # Check cache first
        cached_data = self.redis_client.get(f"api_key:{key_hash}")
        if cached_data:
            key_data = json.loads(cached_data)
        else:
            # Check database
            key_data = self._get_api_key_data(key_hash)
            if key_data:
                # Cache for future requests
                self.redis_client.setex(
                    f"api_key:{key_hash}",
                    86400,
                    json.dumps(key_data)
                )
        
        if not key_data or not key_data.get('is_active'):
            return None
        
        # Update usage stats
        self._update_key_usage(key_hash)
        
        return key_data
    
    def revoke_api_key(self, key_id: str) -> bool:
        """Revoke an API key"""
        success = self._deactivate_api_key(key_id)
        if success:
            # Clear from cache
            key_data = self._get_api_key_by_id(key_id)
            if key_data:
                self.redis_client.delete(f"api_key:{key_data['key_hash']}")
        return success

# JWT Token Manager
class JWTManager:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.algorithm = 'HS256'
        self.access_token_expires = timedelta(minutes=15)
        self.refresh_token_expires = timedelta(days=7)
        self.redis_client = redis.Redis()
    
    def generate_tokens(self, user_id: str, user_role: str = 'user') -> dict:
        """Generate access and refresh tokens"""
        now = datetime.utcnow()
        
        # Access token (short-lived)
        access_payload = {
            'user_id': user_id,
            'role': user_role,
            'type': 'access',
            'iat': now,
            'exp': now + self.access_token_expires,
            'jti': secrets.token_urlsafe(16)  # JWT ID for tracking
        }
        
        # Refresh token (long-lived)
        refresh_payload = {
            'user_id': user_id,
            'type': 'refresh',
            'iat': now,
            'exp': now + self.refresh_token_expires,
            'jti': secrets.token_urlsafe(16)
        }
        
        access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
        refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)
        
        # Store refresh token for validation
        self.redis_client.setex(
            f"refresh_token:{refresh_payload['jti']}",
            int(self.refresh_token_expires.total_seconds()),
            user_id
        )
        
        return {
            'access_token': access_token,
            'refresh_token': refresh_token,
            'token_type': 'Bearer',
            'expires_in': int(self.access_token_expires.total_seconds())
        }
    
    def validate_access_token(self, token: str) -> dict:
        """Validate and decode access token"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            
            # Check token type
            if payload.get('type') != 'access':
                return None
            
            # Check if token is blacklisted
            if self.redis_client.get(f"blacklist:{payload['jti']}"):
                return None
            
            return payload
            
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def refresh_access_token(self, refresh_token: str) -> dict:
        """Generate new access token using refresh token"""
        try:
            payload = jwt.decode(refresh_token, self.secret_key, algorithms=[self.algorithm])
            
            if payload.get('type') != 'refresh':
                return None
            
            # Check if refresh token exists in Redis
            user_id = self.redis_client.get(f"refresh_token:{payload['jti']}")
            if not user_id:
                return None
            
            user_id = user_id.decode('utf-8')
            
            # Generate new access token
            return self.generate_tokens(user_id, 'user')  # You'd get role from DB
            
        except jwt.InvalidTokenError:
            return None
    
    def revoke_token(self, token: str) -> bool:
        """Add token to blacklist"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            
            # Add to blacklist until expiration
            ttl = payload['exp'] - time.time()
            if ttl > 0:
                self.redis_client.setex(
                    f"blacklist:{payload['jti']}",
                    int(ttl),
                    'revoked'
                )
            
            return True
        except jwt.InvalidTokenError:
            return False

# Rate Limiting
class RateLimiter:
    def __init__(self):
        self.redis_client = redis.Redis()
    
    def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
        """
        Check if request is within rate limit
        
        Args:
            identifier: User ID, IP address, or API key
            limit: Maximum requests allowed
            window: Time window in seconds
            
        Returns:
            dict: Rate limit status and metadata
        """
        current_time = int(time.time())
        window_start = current_time - window
        
        # Sliding window log approach
        key = f"rate_limit:{identifier}"
        
        # Remove old entries
        self.redis_client.zremrangebyscore(key, 0, window_start)
        
        # Get current request count
        current_requests = self.redis_client.zcard(key)
        
        if current_requests >= limit:
            # Rate limit exceeded
            oldest_request = self.redis_client.zrange(key, 0, 0, withscores=True)
            if oldest_request:
                reset_time = int(oldest_request[0][1]) + window
            else:
                reset_time = current_time + window
            
            return {
                'allowed': False,
                'limit': limit,
                'remaining': 0,
                'reset_time': reset_time,
                'retry_after': reset_time - current_time
            }
        
        # Add current request
        self.redis_client.zadd(key, {str(current_time): current_time})
        self.redis_client.expire(key, window)
        
        return {
            'allowed': True,
            'limit': limit,
            'remaining': limit - current_requests - 1,
            'reset_time': current_time + window,
            'retry_after': 0
        }

# Authentication Decorators
def require_api_key(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key')
        
        if not api_key:
            return {'error': 'API key required'}, 401
        
        key_data = api_key_manager.validate_api_key(api_key)
        if not key_data:
            return {'error': 'Invalid API key'}, 401
        
        # Add user context to request
        request.user_id = key_data['user_id']
        request.auth_method = 'api_key'
        
        return f(*args, **kwargs)
    
    return decorated_function

def require_jwt_token(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        auth_header = request.headers.get('Authorization')
        
        if not auth_header or not auth_header.startswith('Bearer '):
            return {'error': 'Bearer token required'}, 401
        
        token = auth_header.split(' ')[1]
        payload = jwt_manager.validate_access_token(token)
        
        if not payload:
            return {'error': 'Invalid or expired token'}, 401
        
        # Add user context to request
        request.user_id = payload['user_id']
        request.user_role = payload['role']
        request.auth_method = 'jwt'
        
        return f(*args, **kwargs)
    
    return decorated_function

def rate_limit(limit: int, window: int = 3600):
    """Rate limiting decorator"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Use user ID if authenticated, otherwise IP
            if hasattr(request, 'user_id'):
                identifier = f"user:{request.user_id}"
            else:
                identifier = f"ip:{request.remote_addr}"
            
            rate_limit_result = rate_limiter.check_rate_limit(identifier, limit, window)
            
            if not rate_limit_result['allowed']:
                response = {
                    'error': 'Rate limit exceeded',
                    'retry_after': rate_limit_result['retry_after']
                }
                headers = {
                    'X-RateLimit-Limit': str(limit),
                    'X-RateLimit-Remaining': '0',
                    'X-RateLimit-Reset': str(rate_limit_result['reset_time']),
                    'Retry-After': str(rate_limit_result['retry_after'])
                }
                return response, 429, headers
            
            # Add rate limit headers to response
            response = f(*args, **kwargs)
            if isinstance(response, tuple):
                data, status_code = response[:2]
                headers = response[2] if len(response) > 2 else {}
            else:
                data, status_code, headers = response, 200, {}
            
            headers.update({
                'X-RateLimit-Limit': str(limit),
                'X-RateLimit-Remaining': str(rate_limit_result['remaining']),
                'X-RateLimit-Reset': str(rate_limit_result['reset_time'])
            })
            
            return data, status_code, headers
        
        return decorated_function
    return decorator

# Initialize managers
api_key_manager = APIKeyManager()
jwt_manager = JWTManager(os.environ.get('JWT_SECRET', 'your-secret-key'))
rate_limiter = RateLimiter()

OAuth 2.0 Implementation

# OAuth 2.0 Authorization Server
import base64
import urllib.parse
from typing import Optional

class OAuthAuthorizationServer:
    def __init__(self):
        self.redis_client = redis.Redis()
        self.clients = {}  # In production, load from database
        self.logger = logging.getLogger(__name__)
    
    def register_client(self, client_name: str, redirect_uris: list) -> dict:
        """Register OAuth client application"""
        client_id = secrets.token_urlsafe(16)
        client_secret = secrets.token_urlsafe(32)
        
        client_data = {
            'client_id': client_id,
            'client_secret': client_secret,
            'name': client_name,
            'redirect_uris': redirect_uris,
            'created_at': datetime.utcnow().isoformat()
        }
        
        # Store client data (in production, use database)
        self.clients[client_id] = client_data
        
        return {
            'client_id': client_id,
            'client_secret': client_secret
        }
    
    def generate_authorization_code(self, client_id: str, user_id: str, 
                                  redirect_uri: str, scope: str) -> str:
        """Generate authorization code for OAuth flow"""
        if client_id not in self.clients:
            raise ValueError("Invalid client_id")
        
        client = self.clients[client_id]
        if redirect_uri not in client['redirect_uris']:
            raise ValueError("Invalid redirect_uri")
        
        # Generate authorization code
        auth_code = secrets.token_urlsafe(32)
        
        # Store authorization code data
        code_data = {
            'client_id': client_id,
            'user_id': user_id,
            'redirect_uri': redirect_uri,
            'scope': scope,
            'created_at': time.time()
        }
        
        # Authorization codes expire in 10 minutes
        self.redis_client.setex(
            f"auth_code:{auth_code}",
            600,
            json.dumps(code_data)
        )
        
        return auth_code
    
    def exchange_code_for_token(self, client_id: str, client_secret: str,
                               auth_code: str, redirect_uri: str) -> dict:
        """Exchange authorization code for access token"""
        # Validate client credentials
        if client_id not in self.clients:
            raise ValueError("Invalid client")
        
        client = self.clients[client_id]
        if client['client_secret'] != client_secret:
            raise ValueError("Invalid client credentials")
        
        # Get and validate authorization code
        code_data_json = self.redis_client.get(f"auth_code:{auth_code}")
        if not code_data_json:
            raise ValueError("Invalid or expired authorization code")
        
        code_data = json.loads(code_data_json)
        
        # Validate code belongs to this client and redirect URI
        if (code_data['client_id'] != client_id or 
            code_data['redirect_uri'] != redirect_uri):
            raise ValueError("Authorization code mismatch")
        
        # Delete used authorization code
        self.redis_client.delete(f"auth_code:{auth_code}")
        
        # Generate access token
        access_token = secrets.token_urlsafe(32)
        refresh_token = secrets.token_urlsafe(32)
        
        token_data = {
            'access_token': access_token,
            'token_type': 'Bearer',
            'expires_in': 3600,
            'refresh_token': refresh_token,
            'scope': code_data['scope'],
            'user_id': code_data['user_id'],
            'client_id': client_id
        }
        
        # Store access token
        self.redis_client.setex(
            f"access_token:{access_token}",
            3600,
            json.dumps(token_data)
        )
        
        # Store refresh token (longer expiry)
        self.redis_client.setex(
            f"refresh_token:{refresh_token}",
            86400 * 30,  # 30 days
            json.dumps(token_data)
        )
        
        self.logger.info(f"Access token issued for user {code_data['user_id']}")
        
        return {
            'access_token': access_token,
            'token_type': 'Bearer',
            'expires_in': 3600,
            'refresh_token': refresh_token,
            'scope': code_data['scope']
        }
    
    def validate_access_token(self, access_token: str) -> Optional[dict]:
        """Validate OAuth access token"""
        token_data_json = self.redis_client.get(f"access_token:{access_token}")
        if not token_data_json:
            return None
        
        return json.loads(token_data_json)

# OAuth Client Implementation
class OAuthClient:
    def __init__(self, client_id: str, client_secret: str, 
                 authorization_url: str, token_url: str):
        self.client_id = client_id
        self.client_secret = client_secret
        self.authorization_url = authorization_url
        self.token_url = token_url
    
    def get_authorization_url(self, redirect_uri: str, scope: str, state: str = None) -> str:
        """Generate authorization URL for user to visit"""
        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'scope': scope
        }
        
        if state:
            params['state'] = state
        
        return f"{self.authorization_url}?{urllib.parse.urlencode(params)}"
    
    def exchange_code_for_token(self, code: str, redirect_uri: str) -> dict:
        """Exchange authorization code for access token"""
        auth_header = base64.b64encode(
            f"{self.client_id}:{self.client_secret}".encode()
        ).decode()
        
        headers = {
            'Authorization': f'Basic {auth_header}',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
        
        data = {
            'grant_type': 'authorization_code',
            'code': code,
            'redirect_uri': redirect_uri
        }
        
        response = requests.post(self.token_url, headers=headers, data=data)
        response.raise_for_status()
        
        return response.json()

# OAuth Routes
@app.route('/oauth/authorize')
def oauth_authorize():
    """OAuth authorization endpoint"""
    client_id = request.args.get('client_id')
    redirect_uri = request.args.get('redirect_uri')
    scope = request.args.get('scope', 'read')
    state = request.args.get('state')
    
    # Validate client
    if client_id not in oauth_server.clients:
        return {'error': 'invalid_client'}, 400
    
    # In a real app, you'd show a consent screen here
    # For demo, we'll auto-approve
    
    # Check if user is logged in
    user_id = session.get('user_id')
    if not user_id:
        # Redirect to login with return URL
        login_url = f"/login?return_to={urllib.parse.quote(request.url)}"
        return redirect(login_url)
    
    # Generate authorization code
    try:
        auth_code = oauth_server.generate_authorization_code(
            client_id, user_id, redirect_uri, scope
        )
        
        # Redirect back to client with code
        params = {'code': auth_code}
        if state:
            params['state'] = state
        
        redirect_url = f"{redirect_uri}?{urllib.parse.urlencode(params)}"
        return redirect(redirect_url)
        
    except ValueError as e:
        return {'error': str(e)}, 400

@app.route('/oauth/token', methods=['POST'])
def oauth_token():
    """OAuth token endpoint"""
    # Get client credentials from Authorization header or form data
    auth_header = request.headers.get('Authorization')
    
    if auth_header and auth_header.startswith('Basic '):
        # Decode basic auth
        credentials = base64.b64decode(auth_header[6:]).decode()
        client_id, client_secret = credentials.split(':', 1)
    else:
        client_id = request.form.get('client_id')
        client_secret = request.form.get('client_secret')
    
    grant_type = request.form.get('grant_type')
    
    if grant_type == 'authorization_code':
        code = request.form.get('code')
        redirect_uri = request.form.get('redirect_uri')
        
        try:
            token_data = oauth_server.exchange_code_for_token(
                client_id, client_secret, code, redirect_uri
            )
            return token_data
            
        except ValueError as e:
            return {'error': 'invalid_grant', 'error_description': str(e)}, 400
    
    return {'error': 'unsupported_grant_type'}, 400

Security Best Practices

CORS (Cross-Origin Resource Sharing)

from flask_cors import CORS

# Secure CORS configuration
def configure_cors(app):
    """Configure CORS with security best practices"""
    
    # Don't use CORS(app) - too permissive!
    
    # Specific origins only
    allowed_origins = [
        'https://myapp.com',
        'https://api.myapp.com',
        'https://admin.myapp.com'
    ]
    
    # Development origins (only in development)
    if app.config.get('DEBUG'):
        allowed_origins.extend([
            'http://localhost:3000',
            'http://localhost:8080',
            'http://127.0.0.1:3000'
        ])
    
    CORS(app, 
         origins=allowed_origins,
         methods=['GET', 'POST', 'PUT', 'DELETE'],
         allow_headers=['Content-Type', 'Authorization', 'X-API-Key'],
         expose_headers=['X-RateLimit-Limit', 'X-RateLimit-Remaining'],
         supports_credentials=True,
         max_age=3600)

# Custom CORS headers for specific routes
@app.route('/api/public-data')
def public_data():
    """Public endpoint with relaxed CORS"""
    response = make_response({'data': 'public'})
    response.headers['Access-Control-Allow-Origin'] = '*'
    response.headers['Access-Control-Allow-Methods'] = 'GET'
    return response

@app.route('/api/sensitive-data')
@require_jwt_token
def sensitive_data():
    """Sensitive endpoint with strict CORS"""
    response = make_response({'data': 'sensitive'})
    response.headers['Access-Control-Allow-Origin'] = 'https://myapp.com'
    response.headers['Access-Control-Allow-Credentials'] = 'true'
    return response

Input Validation and Sanitization

from marshmallow import Schema, fields, validate, ValidationError
import bleach

class UserRegistrationSchema(Schema):
    """Schema for user registration validation"""
    email = fields.Email(required=True, validate=validate.Length(max=255))
    password = fields.Str(required=True, validate=validate.Length(min=8, max=128))
    first_name = fields.Str(required=True, validate=validate.Length(min=1, max=50))
    last_name = fields.Str(required=True, validate=validate.Length(min=1, max=50))
    age = fields.Int(validate=validate.Range(min=13, max=120))
    website = fields.Url(missing=None)

class APIInputValidator:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    def validate_json_input(self, schema: Schema, data: dict) -> dict:
        """Validate and sanitize JSON input"""
        try:
            # Validate against schema
            validated_data = schema.load(data)
            
            # Sanitize string fields
            for key, value in validated_data.items():
                if isinstance(value, str):
                    # Remove potentially dangerous HTML/script tags
                    validated_data[key] = bleach.clean(
                        value,
                        tags=[],  # No HTML tags allowed
                        attributes={},
                        strip=True
                    ).strip()
            
            return validated_data
            
        except ValidationError as e:
            self.logger.warning(f"Validation error: {e.messages}")
            raise ValueError(f"Invalid input: {e.messages}")
    
    def validate_query_params(self, allowed_params: dict) -> dict:
        """Validate query parameters"""
        validated_params = {}
        
        for param, validator in allowed_params.items():
            value = request.args.get(param)
            
            if value is not None:
                try:
                    # Apply validator function
                    validated_params[param] = validator(value)
                except ValueError as e:
                    raise ValueError(f"Invalid parameter '{param}': {str(e)}")
        
        return validated_params

def validate_input(schema: Schema):
    """Decorator for input validation"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            validator = APIInputValidator()
            
            try:
                # Validate JSON body
                if request.is_json:
                    validated_data = validator.validate_json_input(schema, request.json)
                    request.validated_data = validated_data
                
                return f(*args, **kwargs)
                
            except ValueError as e:
                return {'error': str(e)}, 400
        
        return decorated_function
    return decorator

# Usage
@app.route('/api/users', methods=['POST'])
@validate_input(UserRegistrationSchema())
@rate_limit(10, 3600)  # 10 registrations per hour
def create_user():
    """Create new user with validation"""
    user_data = request.validated_data
    
    # Additional business logic validation
    if user_repository.email_exists(user_data['email']):
        return {'error': 'Email already registered'}, 409
    
    # Create user
    user_id = user_service.create_user(user_data)
    
    return {'user_id': user_id, 'message': 'User created successfully'}, 201

API Security Headers

@app.after_request
def add_security_headers(response):
    """Add security headers to all responses"""
    
    # Prevent XSS attacks
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'DENY'
    response.headers['X-XSS-Protection'] = '1; mode=block'
    
    # HTTPS enforcement
    response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
    
    # Content Security Policy
    response.headers['Content-Security-Policy'] = (
        "default-src 'self'; "
        "script-src 'self' 'unsafe-inline'; "
        "style-src 'self' 'unsafe-inline'; "
        "img-src 'self' data: https:; "
        "connect-src 'self'"
    )
    
    # Referrer Policy
    response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
    
    # Permissions Policy
    response.headers['Permissions-Policy'] = (
        "camera=(), microphone=(), geolocation=()"
    )
    
    return response

# API-specific security middleware
class APISecurityMiddleware:
    def __init__(self, app):
        self.app = app
        self.logger = logging.getLogger(__name__)
    
    def __call__(self, environ, start_response):
        # Log all API requests
        if environ['PATH_INFO'].startswith('/api/'):
            self.logger.info(
                f"API Request: {environ['REQUEST_METHOD']} {environ['PATH_INFO']} "
                f"from {environ.get('REMOTE_ADDR')}"
            )
            
            # Check for suspicious patterns
            self._check_suspicious_activity(environ)
        
        return self.app(environ, start_response)
    
    def _check_suspicious_activity(self, environ):
        """Check for suspicious request patterns"""
        user_agent = environ.get('HTTP_USER_AGENT', '')
        path = environ.get('PATH_INFO', '')
        
        # Check for common attack patterns
        suspicious_patterns = [
            '../',  # Path traversal
            '<script',  # XSS attempts
            'union select',  # SQL injection
            'eval(',  # Code injection
        ]
        
        for pattern in suspicious_patterns:
            if pattern.lower() in path.lower() or pattern.lower() in user_agent.lower():
                self.logger.warning(
                    f"Suspicious request detected: {pattern} in "
                    f"{environ['REQUEST_METHOD']} {path} from {environ.get('REMOTE_ADDR')}"
                )
                break

# Apply middleware
app.wsgi_app = APISecurityMiddleware(app.wsgi_app)

Production Security Checklist

# Security configuration for production
class SecurityConfig:
    """Production security configuration"""
    
    # Authentication
    JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY')  # Must be set
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(minutes=15)
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=7)
    
    # Rate limiting
    RATE_LIMIT_STORAGE_URL = os.environ.get('REDIS_URL')
    DEFAULT_RATE_LIMIT = "1000 per hour"
    
    # CORS
    CORS_ORIGINS = os.environ.get('CORS_ORIGINS', '').split(',')
    
    # HTTPS
    SSL_REDIRECT = True
    SSL_CERT_PATH = os.environ.get('SSL_CERT_PATH')
    SSL_KEY_PATH = os.environ.get('SSL_KEY_PATH')
    
    # Logging
    LOG_LEVEL = 'INFO'
    SECURITY_LOG_FILE = '/var/log/api_security.log'
    
    @classmethod
    def validate_config(cls):
        """Validate required security configuration"""
        required_vars = [
            'JWT_SECRET_KEY',
            'REDIS_URL',
            'DATABASE_URL'
        ]
        
        missing_vars = []
        for var in required_vars:
            if not os.environ.get(var):
                missing_vars.append(var)
        
        if missing_vars:
            raise RuntimeError(
                f"Missing required environment variables: {', '.join(missing_vars)}"
            )

def create_secure_app():
    """Create Flask app with security configurations"""
    app = Flask(__name__)
    
    # Validate security configuration
    SecurityConfig.validate_config()
    
    # Configure JWT
    jwt_manager = JWTManager(app)
    jwt_manager.secret_key = SecurityConfig.JWT_SECRET_KEY
    
    # Configure CORS
    configure_cors(app)
    
    # Configure rate limiting
    limiter = Limiter(
        app,
        key_func=get_remote_address,
        storage_uri=SecurityConfig.RATE_LIMIT_STORAGE_URL
    )
    
    # Configure logging
    configure_security_logging(app)
    
    return app

def configure_security_logging(app):
    """Configure security-focused logging"""
    security_logger = logging.getLogger('security')
    security_logger.setLevel(logging.INFO)
    
    # File handler for security events
    file_handler = logging.FileHandler(SecurityConfig.SECURITY_LOG_FILE)
    file_handler.setLevel(logging.INFO)
    
    # JSON formatter for log aggregation
    formatter = logging.Formatter(
        '{"timestamp": "%(asctime)s", "level": "%(levelname)s", '
        '"logger": "%(name)s", "message": "%(message)s", "module": "%(module)s"}'
    )
    file_handler.setFormatter(formatter)
    
    security_logger.addHandler(file_handler)
    
    return security_logger

# Security monitoring
class SecurityMonitor:
    def __init__(self):
        self.redis_client = redis.Redis()
        self.logger = logging.getLogger('security')
    
    def log_failed_auth(self, identifier: str, reason: str):
        """Track failed authentication attempts"""
        key = f"failed_auth:{identifier}"
        current_count = self.redis_client.incr(key)
        self.redis_client.expire(key, 3600)  # 1 hour window
        
        self.logger.warning(
            f"Failed authentication: {identifier} - {reason} "
            f"(count: {current_count})"
        )
        
        # Alert if too many failures
        if current_count >= 5:
            self.alert_security_team(identifier, current_count)
    
    def alert_security_team(self, identifier: str, count: int):
        """Alert security team of suspicious activity"""
        alert_data = {
            'type': 'multiple_failed_auth',
            'identifier': identifier,
            'count': count,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        # In production, send to alerting system (PagerDuty, Slack, etc.)
        self.logger.critical(f"SECURITY ALERT: {alert_data}")

security_monitor = SecurityMonitor()

Final Thoughts: Security as a Mindset

That embarrassing API key disaster taught me that security isn't a feature you add at the end – it's a mindset you adopt from the first line of code. Every endpoint, every parameter, every response is a potential attack vector that needs careful consideration.

The security measures I've shared aren't academic exercises – they're battle-tested defenses against real attacks I've seen in production. From brute force attempts to sophisticated OAuth exploits, understanding these patterns helps you build systems that protect both your users and your business.

Start with the basics: strong authentication, proper input validation, and rate limiting. Then layer on advanced techniques like OAuth flows, comprehensive logging, and security monitoring. Remember: security is a process, not a product.

The goal isn't to build an impenetrable fortress (impossible), but to make your system expensive enough to attack that adversaries move on to easier targets. Every security measure you implement raises the bar for potential attackers.


Currently writing this from Analog Coffee in Capitol Hill, where I'm implementing OAuth flows while enjoying my usual cortado. Share your API security war stories @maya_codes_pnw - we've all learned the hard way! 🔐☕

Share this article

Add Comment

No comments yet. Be the first to comment!

More from Backend