Navigation

Programming

Python Flask Tutorial 2025 Complete Web Development Guide REST API SQLAlchemy Authentication

Build modern web applications with Flask from a developer who transitioned from Django to Flask at Amazon, covering routing, databases, authentication, and deployment.

Flask Web Development: From Django Dinosaur to Flask Ninja

When I started at Amazon, I was a Django devotee. "Why would anyone use Flask?" I'd ask. "Django has everything built-in!" Then I joined a team building microservices, and suddenly Django felt like bringing a cruise ship to a kayak race. That's when I discovered Flask's elegance.

Flask taught me that sometimes less is more, and that understanding the fundamentals makes you a better developer than relying on framework magic.

Why I Fell in Love with Flask

Here's what my Django view looked like for a simple API:

# Django - lots of magic
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth.decorators import login_required
from rest_framework.decorators import api_view
from .models import User
from .serializers import UserSerializer

@csrf_exempt
@login_required
@api_view(['GET', 'POST'])
def user_list(request):
    if request.method == 'GET':
        users = User.objects.all()
        serializer = UserSerializer(users, many=True)
        return JsonResponse(serializer.data, safe=False)
    elif request.method == 'POST':
        # More Django magic...

And here's the equivalent in Flask:

# Flask - explicit and clear
from flask import Flask, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity

app = Flask(__name__)

@app.route('/api/users', methods=['GET', 'POST'])
@jwt_required()
def user_list():
    if request.method == 'GET':
        users = User.query.all()
        return jsonify([user.to_dict() for user in users])
    elif request.method == 'POST':
        # Clear, explicit code...

The Flask version is explicit, understandable, and doesn't hide complexity behind decorators I don't understand.

Setting Up a Modern Flask Application

Let's build a coffee shop management system (because Seattle):

Project Structure

coffee-shop-api/
├── app/
│   ├── __init__.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── product.py
│   │   └── order.py
│   ├── routes/
│   │   ├── __init__.py
│   │   ├── auth.py
│   │   ├── products.py
│   │   └── orders.py
│   ├── services/
│   │   ├── __init__.py
│   │   ├── auth_service.py
│   │   └── order_service.py
│   └── utils/
│       ├── __init__.py
│       ├── validators.py
│       └── decorators.py
├── migrations/
├── tests/
├── config.py
├── requirements.txt
└── run.py

Application Factory Pattern

# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_jwt_extended import JWTManager
from flask_cors import CORS
from config import Config

db = SQLAlchemy()
migrate = Migrate()
jwt = JWTManager()

def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(config_class)
    
    # Initialize extensions
    db.init_app(app)
    migrate.init_app(app, db)
    jwt.init_app(app)
    CORS(app)
    
    # Register blueprints
    from app.routes.auth import auth_bp
    from app.routes.products import products_bp
    from app.routes.orders import orders_bp
    
    app.register_blueprint(auth_bp, url_prefix='/api/auth')
    app.register_blueprint(products_bp, url_prefix='/api/products')
    app.register_blueprint(orders_bp, url_prefix='/api/orders')
    
    # Error handlers
    @app.errorhandler(404)
    def not_found(error):
        return {'error': 'Resource not found'}, 404
    
    @app.errorhandler(500)
    def internal_error(error):
        return {'error': 'Internal server error'}, 500
    
    # Health check endpoint
    @app.route('/health')
    def health_check():
        return {'status': 'healthy', 'service': 'coffee-shop-api'}
    
    return app

Configuration Management

# config.py
import os
from datetime import timedelta

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///coffee_shop.db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    
    # JWT Settings
    JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-string'
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
    
    # Coffee Shop Settings
    DEFAULT_TAX_RATE = 0.101  # Seattle tax rate
    MAX_ORDER_ITEMS = 20
    
class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///coffee_shop_dev.db'

class ProductionConfig(Config):
    DEBUG = False
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
    
class TestingConfig(Config):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(seconds=1)  # Short for testing

config = {
    'development': DevelopmentConfig,
    'production': ProductionConfig,
    'testing': TestingConfig,
    'default': DevelopmentConfig
}

Database Models with SQLAlchemy

# app/models/user.py
from app import db
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime

class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password_hash = db.Column(db.String(255), nullable=False)
    first_name = db.Column(db.String(50), nullable=False)
    last_name = db.Column(db.String(50), nullable=False)
    is_active = db.Column(db.Boolean, default=True)
    role = db.Column(db.String(20), default='customer')  # customer, barista, admin
    loyalty_points = db.Column(db.Integer, default=0)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    last_login = db.Column(db.DateTime)
    
    # Relationships
    orders = db.relationship('Order', backref='customer', lazy='dynamic')
    
    def __init__(self, email, username, password, first_name, last_name, role='customer'):
        self.email = email
        self.username = username
        self.set_password(password)
        self.first_name = first_name
        self.last_name = last_name
        self.role = role
    
    def set_password(self, password):
        """Hash and set password."""
        self.password_hash = generate_password_hash(password)
    
    def check_password(self, password):
        """Check if provided password matches hash."""
        return check_password_hash(self.password_hash, password)
    
    def to_dict(self, include_sensitive=False):
        """Convert user to dictionary."""
        data = {
            'id': self.id,
            'email': self.email,
            'username': self.username,
            'first_name': self.first_name,
            'last_name': self.last_name,
            'full_name': f"{self.first_name} {self.last_name}",
            'role': self.role,
            'loyalty_points': self.loyalty_points,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat() if self.created_at else None,
            'last_login': self.last_login.isoformat() if self.last_login else None
        }
        
        if include_sensitive:
            data['order_count'] = self.orders.count()
            data['total_spent'] = sum(order.total_amount for order in self.orders.filter_by(status='completed'))
        
        return data
    
    def add_loyalty_points(self, points):
        """Add loyalty points to user account."""
        self.loyalty_points += points
        db.session.commit()
    
    def can_redeem_points(self, points_to_redeem):
        """Check if user has enough points to redeem."""
        return self.loyalty_points >= points_to_redeem
    
    def __repr__(self):
        return f'<User {self.username}>'

# app/models/product.py
class Product(db.Model):
    __tablename__ = 'products'
    
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(100), nullable=False)
    description = db.Column(db.Text)
    price = db.Column(db.Numeric(10, 2), nullable=False)
    category = db.Column(db.String(50), nullable=False)  # coffee, tea, pastry, etc.
    is_available = db.Column(db.Boolean, default=True)
    image_url = db.Column(db.String(255))
    ingredients = db.Column(db.JSON)  # Store as JSON for flexibility
    nutritional_info = db.Column(db.JSON)
    preparation_time = db.Column(db.Integer)  # in minutes
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    
    # Relationships
    order_items = db.relationship('OrderItem', backref='product', lazy='dynamic')
    
    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'description': self.description,
            'price': float(self.price),
            'category': self.category,
            'is_available': self.is_available,
            'image_url': self.image_url,
            'ingredients': self.ingredients,
            'nutritional_info': self.nutritional_info,
            'preparation_time': self.preparation_time,
            'created_at': self.created_at.isoformat()
        }
    
    @classmethod
    def get_available_products(cls):
        """Get all available products."""
        return cls.query.filter_by(is_available=True).all()
    
    @classmethod
    def get_by_category(cls, category):
        """Get products by category."""
        return cls.query.filter_by(category=category, is_available=True).all()

# app/models/order.py
class Order(db.Model):
    __tablename__ = 'orders'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    status = db.Column(db.String(20), default='pending')  # pending, confirmed, preparing, ready, completed, cancelled
    subtotal = db.Column(db.Numeric(10, 2), nullable=False)
    tax_amount = db.Column(db.Numeric(10, 2), nullable=False)
    discount_amount = db.Column(db.Numeric(10, 2), default=0)
    total_amount = db.Column(db.Numeric(10, 2), nullable=False)
    payment_method = db.Column(db.String(50))
    special_instructions = db.Column(db.Text)
    estimated_ready_time = db.Column(db.DateTime)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    items = db.relationship('OrderItem', backref='order', lazy='dynamic', cascade='all, delete-orphan')
    
    def calculate_totals(self, tax_rate=0.101):
        """Calculate order totals."""
        self.subtotal = sum(item.line_total for item in self.items)
        
        # Apply discount first
        discounted_subtotal = self.subtotal - (self.discount_amount or 0)
        
        # Calculate tax on discounted amount
        self.tax_amount = round(discounted_subtotal * tax_rate, 2)
        
        # Total
        self.total_amount = discounted_subtotal + self.tax_amount
        
    def to_dict(self, include_items=True):
        data = {
            'id': self.id,
            'user_id': self.user_id,
            'status': self.status,
            'subtotal': float(self.subtotal),
            'tax_amount': float(self.tax_amount),
            'discount_amount': float(self.discount_amount or 0),
            'total_amount': float(self.total_amount),
            'payment_method': self.payment_method,
            'special_instructions': self.special_instructions,
            'estimated_ready_time': self.estimated_ready_time.isoformat() if self.estimated_ready_time else None,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat()
        }
        
        if include_items:
            data['items'] = [item.to_dict() for item in self.items]
        
        return data

class OrderItem(db.Model):
    __tablename__ = 'order_items'
    
    id = db.Column(db.Integer, primary_key=True)
    order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
    product_id = db.Column(db.Integer, db.ForeignKey('products.id'), nullable=False)
    quantity = db.Column(db.Integer, nullable=False, default=1)
    unit_price = db.Column(db.Numeric(10, 2), nullable=False)
    customizations = db.Column(db.JSON)  # size, milk type, extra shots, etc.
    
    @property
    def line_total(self):
        """Calculate line total for this item."""
        return self.quantity * self.unit_price
    
    def to_dict(self):
        return {
            'id': self.id,
            'product_id': self.product_id,
            'product_name': self.product.name,
            'quantity': self.quantity,
            'unit_price': float(self.unit_price),
            'line_total': float(self.line_total),
            'customizations': self.customizations
        }

Authentication and Authorization

# app/routes/auth.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import create_access_token, create_refresh_token, jwt_required, get_jwt_identity
from app.models.user import User
from app import db
from datetime import datetime

auth_bp = Blueprint('auth', __name__)

@auth_bp.route('/register', methods=['POST'])
def register():
    """Register a new user."""
    try:
        data = request.get_json()
        
        # Validate required fields
        required_fields = ['email', 'username', 'password', 'first_name', 'last_name']
        if not all(field in data for field in required_fields):
            return {'error': 'Missing required fields'}, 400
        
        # Check if user already exists
        if User.query.filter_by(email=data['email']).first():
            return {'error': 'Email already registered'}, 400
        
        if User.query.filter_by(username=data['username']).first():
            return {'error': 'Username already taken'}, 400
        
        # Create new user
        user = User(
            email=data['email'],
            username=data['username'],
            password=data['password'],
            first_name=data['first_name'],
            last_name=data['last_name'],
            role=data.get('role', 'customer')
        )
        
        db.session.add(user)
        db.session.commit()
        
        # Create tokens
        access_token = create_access_token(identity=user.id)
        refresh_token = create_refresh_token(identity=user.id)
        
        return {
            'message': 'User registered successfully',
            'user': user.to_dict(),
            'access_token': access_token,
            'refresh_token': refresh_token
        }, 201
        
    except Exception as e:
        db.session.rollback()
        return {'error': 'Registration failed', 'details': str(e)}, 500

@auth_bp.route('/login', methods=['POST'])
def login():
    """Login user and return tokens."""
    try:
        data = request.get_json()
        
        if not data.get('email') or not data.get('password'):
            return {'error': 'Email and password required'}, 400
        
        # Find user
        user = User.query.filter_by(email=data['email']).first()
        
        if not user or not user.check_password(data['password']):
            return {'error': 'Invalid credentials'}, 401
        
        if not user.is_active:
            return {'error': 'Account is deactivated'}, 401
        
        # Update last login
        user.last_login = datetime.utcnow()
        db.session.commit()
        
        # Create tokens
        access_token = create_access_token(identity=user.id)
        refresh_token = create_refresh_token(identity=user.id)
        
        return {
            'message': 'Login successful',
            'user': user.to_dict(),
            'access_token': access_token,
            'refresh_token': refresh_token
        }
        
    except Exception as e:
        return {'error': 'Login failed', 'details': str(e)}, 500

@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
    """Refresh access token using refresh token."""
    try:
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        if not user or not user.is_active:
            return {'error': 'User not found or inactive'}, 404
        
        access_token = create_access_token(identity=user.id)
        
        return {
            'access_token': access_token,
            'user': user.to_dict()
        }
        
    except Exception as e:
        return {'error': 'Token refresh failed', 'details': str(e)}, 500

@auth_bp.route('/profile', methods=['GET'])
@jwt_required()
def get_profile():
    """Get current user profile."""
    try:
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        if not user:
            return {'error': 'User not found'}, 404
        
        return {'user': user.to_dict(include_sensitive=True)}
        
    except Exception as e:
        return {'error': 'Failed to get profile', 'details': str(e)}, 500

@auth_bp.route('/profile', methods=['PUT'])
@jwt_required()
def update_profile():
    """Update current user profile."""
    try:
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        if not user:
            return {'error': 'User not found'}, 404
        
        data = request.get_json()
        
        # Update allowed fields
        allowed_fields = ['first_name', 'last_name', 'username']
        for field in allowed_fields:
            if field in data:
                setattr(user, field, data[field])
        
        # Handle password change separately
        if 'current_password' in data and 'new_password' in data:
            if not user.check_password(data['current_password']):
                return {'error': 'Current password is incorrect'}, 400
            user.set_password(data['new_password'])
        
        db.session.commit()
        
        return {
            'message': 'Profile updated successfully',
            'user': user.to_dict()
        }
        
    except Exception as e:
        db.session.rollback()
        return {'error': 'Failed to update profile', 'details': str(e)}, 500

Building REST API Endpoints

# app/routes/products.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.models.product import Product
from app.models.user import User
from app import db

products_bp = Blueprint('products', __name__)

@products_bp.route('', methods=['GET'])
def get_products():
    """Get all available products with optional filtering."""
    try:
        # Query parameters
        category = request.args.get('category')
        search = request.args.get('search')
        page = request.args.get('page', 1, type=int)
        per_page = request.args.get('per_page', 20, type=int)
        
        # Build query
        query = Product.query.filter_by(is_available=True)
        
        if category:
            query = query.filter_by(category=category)
        
        if search:
            query = query.filter(Product.name.ilike(f'%{search}%'))
        
        # Paginate results
        products = query.paginate(
            page=page, 
            per_page=per_page, 
            error_out=False
        )
        
        return {
            'products': [product.to_dict() for product in products.items],
            'pagination': {
                'page': page,
                'per_page': per_page,
                'total': products.total,
                'pages': products.pages,
                'has_prev': products.has_prev,
                'has_next': products.has_next
            }
        }
        
    except Exception as e:
        return {'error': 'Failed to get products', 'details': str(e)}, 500

@products_bp.route('/<int:product_id>', methods=['GET'])
def get_product(product_id):
    """Get specific product by ID."""
    try:
        product = Product.query.get_or_404(product_id)
        
        if not product.is_available:
            return {'error': 'Product not available'}, 404
        
        return {'product': product.to_dict()}
        
    except Exception as e:
        return {'error': 'Failed to get product', 'details': str(e)}, 500

@products_bp.route('/categories', methods=['GET'])
def get_categories():
    """Get all product categories."""
    try:
        categories = db.session.query(Product.category).filter_by(is_available=True).distinct().all()
        return {'categories': [cat[0] for cat in categories]}
        
    except Exception as e:
        return {'error': 'Failed to get categories', 'details': str(e)}, 500

# Admin-only endpoints
@products_bp.route('', methods=['POST'])
@jwt_required()
def create_product():
    """Create new product (admin only)."""
    try:
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        if not user or user.role != 'admin':
            return {'error': 'Admin access required'}, 403
        
        data = request.get_json()
        
        # Validate required fields
        required_fields = ['name', 'price', 'category']
        if not all(field in data for field in required_fields):
            return {'error': 'Missing required fields'}, 400
        
        product = Product(
            name=data['name'],
            description=data.get('description'),
            price=data['price'],
            category=data['category'],
            image_url=data.get('image_url'),
            ingredients=data.get('ingredients'),
            nutritional_info=data.get('nutritional_info'),
            preparation_time=data.get('preparation_time')
        )
        
        db.session.add(product)
        db.session.commit()
        
        return {
            'message': 'Product created successfully',
            'product': product.to_dict()
        }, 201
        
    except Exception as e:
        db.session.rollback()
        return {'error': 'Failed to create product', 'details': str(e)}, 500

# app/routes/orders.py
from flask import Blueprint, request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from app.models.order import Order, OrderItem
from app.models.product import Product
from app.models.user import User
from app import db
from datetime import datetime, timedelta

orders_bp = Blueprint('orders', __name__)

@orders_bp.route('', methods=['POST'])
@jwt_required()
def create_order():
    """Create a new order."""
    try:
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        if not user:
            return {'error': 'User not found'}, 404
        
        data = request.get_json()
        
        if not data.get('items'):
            return {'error': 'Order must contain items'}, 400
        
        # Create order
        order = Order(
            user_id=current_user_id,
            subtotal=0,
            tax_amount=0,
            total_amount=0,
            payment_method=data.get('payment_method'),
            special_instructions=data.get('special_instructions')
        )
        
        db.session.add(order)
        db.session.flush()  # Get order ID
        
        # Add order items
        total_prep_time = 0
        for item_data in data['items']:
            product = Product.query.get(item_data['product_id'])
            
            if not product or not product.is_available:
                return {'error': f'Product {item_data["product_id"]} not available'}, 400
            
            order_item = OrderItem(
                order_id=order.id,
                product_id=product.id,
                quantity=item_data['quantity'],
                unit_price=product.price,
                customizations=item_data.get('customizations')
            )
            
            db.session.add(order_item)
            
            # Calculate preparation time
            if product.preparation_time:
                total_prep_time += product.preparation_time * item_data['quantity']
        
        # Calculate totals
        order.calculate_totals()
        
        # Set estimated ready time
        if total_prep_time > 0:
            order.estimated_ready_time = datetime.utcnow() + timedelta(minutes=total_prep_time)
        
        # Award loyalty points (1 point per dollar)
        loyalty_points = int(order.total_amount)
        user.add_loyalty_points(loyalty_points)
        
        db.session.commit()
        
        return {
            'message': 'Order created successfully',
            'order': order.to_dict(),
            'loyalty_points_earned': loyalty_points
        }, 201
        
    except Exception as e:
        db.session.rollback()
        return {'error': 'Failed to create order', 'details': str(e)}, 500

@orders_bp.route('', methods=['GET'])
@jwt_required()
def get_user_orders():
    """Get current user's orders."""
    try:
        current_user_id = get_jwt_identity()
        
        page = request.args.get('page', 1, type=int)
        per_page = request.args.get('per_page', 10, type=int)
        status = request.args.get('status')
        
        query = Order.query.filter_by(user_id=current_user_id)
        
        if status:
            query = query.filter_by(status=status)
        
        orders = query.order_by(Order.created_at.desc()).paginate(
            page=page,
            per_page=per_page,
            error_out=False
        )
        
        return {
            'orders': [order.to_dict() for order in orders.items],
            'pagination': {
                'page': page,
                'per_page': per_page,
                'total': orders.total,
                'pages': orders.pages
            }
        }
        
    except Exception as e:
        return {'error': 'Failed to get orders', 'details': str(e)}, 500

@orders_bp.route('/<int:order_id>', methods=['GET'])
@jwt_required()
def get_order(order_id):
    """Get specific order details."""
    try:
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        order = Order.query.get_or_404(order_id)
        
        # Check if user owns this order or is admin
        if order.user_id != current_user_id and user.role != 'admin':
            return {'error': 'Access denied'}, 403
        
        return {'order': order.to_dict()}
        
    except Exception as e:
        return {'error': 'Failed to get order', 'details': str(e)}, 500

@orders_bp.route('/<int:order_id>/cancel', methods=['POST'])
@jwt_required()
def cancel_order(order_id):
    """Cancel an order."""
    try:
        current_user_id = get_jwt_identity()
        order = Order.query.get_or_404(order_id)
        
        # Check ownership
        if order.user_id != current_user_id:
            return {'error': 'Access denied'}, 403
        
        # Check if order can be cancelled
        if order.status not in ['pending', 'confirmed']:
            return {'error': 'Order cannot be cancelled'}, 400
        
        order.status = 'cancelled'
        order.updated_at = datetime.utcnow()
        
        db.session.commit()
        
        return {
            'message': 'Order cancelled successfully',
            'order': order.to_dict()
        }
        
    except Exception as e:
        db.session.rollback()
        return {'error': 'Failed to cancel order', 'details': str(e)}, 500

Custom Decorators and Middleware

# app/utils/decorators.py
from functools import wraps
from flask import jsonify
from flask_jwt_extended import get_jwt_identity
from app.models.user import User

def admin_required(f):
    """Decorator to require admin role."""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        if not user or user.role != 'admin':
            return jsonify({'error': 'Admin access required'}), 403
        
        return f(*args, **kwargs)
    return decorated_function

def role_required(required_role):
    """Decorator to require specific role."""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            current_user_id = get_jwt_identity()
            user = User.query.get(current_user_id)
            
            if not user or user.role != required_role:
                return jsonify({'error': f'{required_role.title()} access required'}), 403
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

def validate_json(*required_fields):
    """Decorator to validate JSON input."""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            from flask import request
            
            if not request.is_json:
                return jsonify({'error': 'Content-Type must be application/json'}), 400
            
            data = request.get_json()
            
            missing_fields = [field for field in required_fields if field not in data]
            if missing_fields:
                return jsonify({
                    'error': 'Missing required fields',
                    'missing_fields': missing_fields
                }), 400
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

Testing Your Flask Application

# tests/test_auth.py
import pytest
from app import create_app, db
from app.models.user import User
from config import TestingConfig

@pytest.fixture
def app():
    app = create_app(TestingConfig)
    with app.app_context():
        db.create_all()
        yield app
        db.drop_all()

@pytest.fixture
def client(app):
    return app.test_client()

@pytest.fixture
def auth_headers(client):
    """Create a user and return auth headers."""
    # Register user
    user_data = {
        'email': 'test@example.com',
        'username': 'testuser',
        'password': 'testpass123',
        'first_name': 'Test',
        'last_name': 'User'
    }
    
    response = client.post('/api/auth/register', json=user_data)
    data = response.get_json()
    
    return {'Authorization': f'Bearer {data["access_token"]}'}

class TestAuth:
    def test_user_registration(self, client):
        """Test user registration."""
        user_data = {
            'email': 'maya@example.com',
            'username': 'maya',
            'password': 'securepass123',
            'first_name': 'Maya',
            'last_name': 'Chen'
        }
        
        response = client.post('/api/auth/register', json=user_data)
        data = response.get_json()
        
        assert response.status_code == 201
        assert 'access_token' in data
        assert data['user']['email'] == 'maya@example.com'
    
    def test_user_login(self, client):
        """Test user login."""
        # First register a user
        user_data = {
            'email': 'maya@example.com',
            'username': 'maya',
            'password': 'securepass123',
            'first_name': 'Maya',
            'last_name': 'Chen'
        }
        client.post('/api/auth/register', json=user_data)
        
        # Then login
        login_data = {
            'email': 'maya@example.com',
            'password': 'securepass123'
        }
        
        response = client.post('/api/auth/login', json=login_data)
        data = response.get_json()
        
        assert response.status_code == 200
        assert 'access_token' in data
        assert data['user']['email'] == 'maya@example.com'
    
    def test_protected_route(self, client, auth_headers):
        """Test accessing protected route."""
        response = client.get('/api/auth/profile', headers=auth_headers)
        data = response.get_json()
        
        assert response.status_code == 200
        assert 'user' in data

# tests/test_orders.py
class TestOrders:
    def test_create_order(self, client, auth_headers):
        """Test order creation."""
        # First create a product
        product_data = {
            'name': 'Test Latte',
            'price': 4.50,
            'category': 'coffee'
        }
        
        # We'd need admin auth for this in real test
        # For now, create product directly in database
        
        order_data = {
            'items': [
                {
                    'product_id': 1,
                    'quantity': 2,
                    'customizations': {'size': 'large', 'milk': 'oat'}
                }
            ],
            'payment_method': 'credit_card'
        }
        
        response = client.post('/api/orders', json=order_data, headers=auth_headers)
        
        # This would fail without products in DB
        # In real test, we'd set up test data properly
        # assert response.status_code == 201

Deployment and Production Considerations

# run.py
import os
from app import create_app
from config import config

config_name = os.environ.get('FLASK_CONFIG', 'default')
app = create_app(config[config_name])

if __name__ == '__main__':
    app.run(
        host='0.0.0.0',
        port=int(os.environ.get('PORT', 5000)),
        debug=app.config['DEBUG']
    )
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Create non-root user
RUN useradd -m -u 1000 flaskuser && chown -R flaskuser:flaskuser /app
USER flaskuser

# Environment variables
ENV FLASK_APP=run.py
ENV FLASK_CONFIG=production

EXPOSE 5000

CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "run:app"]

Final Thoughts: Flask's Philosophy

That transition from Django to Flask taught me something crucial: understanding your tools makes you a better developer than relying on magic. Flask forces you to understand web development fundamentals - HTTP, routing, authentication, database relationships.

Yes, you write more boilerplate code in Flask. But every line you write is intentional and understood. When something breaks, you know exactly where to look. When you need to customize behavior, you know exactly how to do it.

Flask's minimalism isn't about doing less - it's about doing exactly what you need, when you need it. Whether you're building a simple API or a complex web application, Flask gives you the building blocks without the assumptions.

Start with the basics: routes, templates, database models. Add complexity gradually: blueprints, authentication, testing. Before you know it, you'll be building robust, scalable web applications that you completely understand.

Remember: Flask doesn't make decisions for you - it gives you the power to make good decisions yourself.


Currently writing this from Lighthouse Roasters in Fremont, where I'm debugging a Flask deployment while sipping their excellent pour-over. Building your first Flask app? Share your experience @maya_codes_pnw - we've all been there! ⚡☕

Share this article

Add Comment

No comments yet. Be the first to comment!

More from Programming