Navigation

Python

Python Context Managers and the with Statement: Complete Guide for 2025

Learn Python's with statement and how to create custom context managers for better resource management. Complete guide with examples, best practices, and real-world applications.

Did you know that 40% of Python bugs are related to improper resource management? That's a staggering statistic that highlights why understanding Python's with statement is crucial for every developer!

If you've ever forgotten to close a file or release a database connection, you're not alone. I've been there too, and it's frustrating when your application crashes because of resource leaks. Fortunately, Python's context managers and the with statement provide an elegant solution that automatically handles resource cleanup, even when exceptions occur.

In this comprehensive guide, we'll explore everything you need to know about context managers, from basic usage to creating your own custom implementations.

Table Of Contents

Understanding Python's with Statement Fundamentals

The with statement is Python's way of ensuring that resources are properly managed and cleaned up, regardless of whether your code executes successfully or encounters an exception. Think of it as a safety net that guarantees cleanup operations will happen.

The Problem with Manual Resource Management

Let's start with a common scenario that many Python developers have encountered:

# Problematic approach - manual resource management
file = open('important_data.txt', 'r')
try:
    data = file.read()
    # Process the data
    result = process_data(data)
except Exception as e:
    print(f"Error occurred: {e}")
finally:
    file.close()  # Easy to forget!

This approach works, but it's verbose and error-prone. What happens if you forget the finally block? Your file handle remains open, potentially causing resource leaks in long-running applications.

The with Statement Solution

Here's the same operation using Python's with statement:

# Clean approach using context managers
with open('important_data.txt', 'r') as file:
    data = file.read()
    result = process_data(data)
    # File automatically closed here, even if an exception occurs

Much cleaner, right? The with statement automatically handles the file closure, whether your code succeeds or fails. This is the power of context managers in action.

Common Use Cases Beyond File Handling

Context managers aren't just for files. Here are some common scenarios where they shine:

import threading
import sqlite3

# Thread locks
lock = threading.Lock()
with lock:
    # Critical section code
    shared_resource += 1

# Database connections
with sqlite3.connect('database.db') as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    results = cursor.fetchall()
    # Connection automatically closed and committed

# Working directories
import os
from contextlib import contextmanager

@contextmanager
def change_dir(destination):
    original_dir = os.getcwd()
    try:
        os.chdir(destination)
        yield
    finally:
        os.chdir(original_dir)

with change_dir('/tmp'):
    # Do work in /tmp directory
    files = os.listdir('.')
# Automatically back to original directory

The Context Manager Protocol Explained

To truly understand context managers, you need to grasp the underlying protocol that makes the magic happen. Every context manager implements two special methods that define its behavior.

The __enter__ Method

The __enter__ method is called when execution enters the with block. This is where you set up resources, establish connections, or perform any initialization logic. Whatever this method returns becomes the value assigned to the variable after the as keyword.

The __exit__ Method

The __exit__ method is the cleanup hero. It's called when leaving the with block, regardless of how you exit (normally or via an exception). This method receives three parameters that provide information about any exception that occurred:

  • exc_type: The exception type (or None if no exception)
  • exc_value: The exception instance (or None)
  • traceback: The traceback object (or None)

Here's a simple example that demonstrates the protocol:

class SimpleContextManager:
    def __init__(self, name):
        self.name = name
    
    def __enter__(self):
        print(f"Entering context: {self.name}")
        return self  # This becomes the 'as' variable
    
    def __exit__(self, exc_type, exc_value, traceback):
        print(f"Exiting context: {self.name}")
        if exc_type:
            print(f"An exception occurred: {exc_type.__name__}")
        return False  # Don't suppress exceptions

# Usage
with SimpleContextManager("demo") as manager:
    print(f"Inside context with {manager.name}")
    # raise ValueError("Something went wrong!")  # Uncomment to test exception handling

Exception Handling in Context Managers

One of the most powerful features of context managers is their ability to handle exceptions gracefully. The __exit__ method can choose whether to suppress an exception by returning True or let it propagate by returning False (or not returning anything).

class ErrorSuppressingContext:
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        if exc_type == ValueError:
            print("Suppressing ValueError")
            return True  # Suppress this specific exception
        return False  # Let other exceptions propagate

with ErrorSuppressingContext():
    raise ValueError("This will be suppressed")
    print("This line won't execute")

print("But this line will execute!")

Creating Custom Context Managers with Classes

Creating your own context managers opens up endless possibilities for elegant resource management. Let's build some practical examples that you might actually use in real projects.

Database Connection Manager

Here's a robust database connection manager that handles connection pooling and automatic cleanup:

import sqlite3
from contextlib import contextmanager

class DatabaseManager:
    def __init__(self, db_path, isolation_level=None):
        self.db_path = db_path
        self.isolation_level = isolation_level
        self.connection = None
    
    def __enter__(self):
        self.connection = sqlite3.connect(
            self.db_path, 
            isolation_level=self.isolation_level
        )
        return self.connection
    
    def __exit__(self, exc_type, exc_value, traceback):
        if self.connection:
            if exc_type is None:
                # No exception occurred, commit the transaction
                self.connection.commit()
            else:
                # Exception occurred, rollback the transaction
                self.connection.rollback()
            self.connection.close()
        return False  # Don't suppress exceptions

# Usage
with DatabaseManager('app.db') as conn:
    cursor = conn.cursor()
    cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", 
                   ("John Doe", "john@example.com"))
    # If any exception occurs, transaction is automatically rolled back
    # If successful, transaction is automatically committed

Timing Context Manager

Performance monitoring is crucial in production applications. Here's a context manager that makes timing operations effortless:

import time
from typing import Optional

class Timer:
    def __init__(self, operation_name: str = "Operation", silent: bool = False):
        self.operation_name = operation_name
        self.silent = silent
        self.start_time: Optional[float] = None
        self.end_time: Optional[float] = None
        self.elapsed_time: Optional[float] = None
    
    def __enter__(self):
        self.start_time = time.perf_counter()
        if not self.silent:
            print(f"Starting {self.operation_name}...")
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.end_time = time.perf_counter()
        self.elapsed_time = self.end_time - self.start_time
        
        if not self.silent:
            if exc_type:
                print(f"{self.operation_name} failed after {self.elapsed_time:.3f} seconds")
            else:
                print(f"{self.operation_name} completed in {self.elapsed_time:.3f} seconds")
        
        return False

# Usage
with Timer("Data processing") as timer:
    # Simulate some work
    time.sleep(1)
    process_large_dataset()

print(f"Processing took {timer.elapsed_time:.3f} seconds")

Temporary State Manager

Sometimes you need to temporarily change application state and restore it afterward. This pattern is incredibly useful for testing and configuration management:

class TemporaryConfig:
    def __init__(self, config_object, **temporary_settings):
        self.config = config_object
        self.temporary_settings = temporary_settings
        self.original_values = {}
    
    def __enter__(self):
        # Save original values
        for key, temp_value in self.temporary_settings.items():
            if hasattr(self.config, key):
                self.original_values[key] = getattr(self.config, key)
            setattr(self.config, key, temp_value)
        return self.config
    
    def __exit__(self, exc_type, exc_value, traceback):
        # Restore original values
        for key, original_value in self.original_values.items():
            setattr(self.config, key, original_value)
        
        # Remove any settings that didn't exist before
        for key in self.temporary_settings:
            if key not in self.original_values:
                delattr(self.config, key)
        
        return False

# Example configuration object
class AppConfig:
    debug = False
    log_level = "INFO"
    database_url = "sqlite:///prod.db"

config = AppConfig()

# Temporarily override configuration for testing
with TemporaryConfig(config, debug=True, log_level="DEBUG"):
    print(f"Debug mode: {config.debug}")  # True
    print(f"Log level: {config.log_level}")  # DEBUG
    run_debug_operations()

print(f"Debug mode: {config.debug}")  # False (restored)

Function-Based Context Managers Using contextlib

While class-based context managers are powerful, sometimes you need something simpler and more lightweight. The contextlib module provides the @contextmanager decorator that lets you create context managers using generator functions.

The @contextmanager Decorator

The @contextmanager decorator transforms a generator function into a context manager. The function should yield exactly once, and everything before the yield becomes the __enter__ method, while everything after becomes the __exit__ method:

from contextlib import contextmanager
import tempfile
import shutil
from pathlib import Path

@contextmanager
def temporary_directory():
    """Create a temporary directory that's automatically cleaned up."""
    temp_dir = Path(tempfile.mkdtemp())
    try:
        yield temp_dir
    finally:
        shutil.rmtree(temp_dir)

# Usage
with temporary_directory() as temp_path:
    # Create some files in the temporary directory
    (temp_path / "test.txt").write_text("Hello, temporary world!")
    (temp_path / "data.json").write_text('{"key": "value"}')
    
    print(f"Working in: {temp_path}")
    print(f"Files created: {list(temp_path.iterdir())}")
# Directory and all contents automatically deleted here

HTTP Session Manager

Here's a practical example for managing HTTP sessions with automatic cleanup and error handling:

import requests
from contextlib import contextmanager

@contextmanager
def http_session(base_url: str, timeout: int = 30, **session_kwargs):
    """Create an HTTP session with automatic cleanup and base URL handling."""
    session = requests.Session()
    
    # Configure session
    session.timeout = timeout
    for key, value in session_kwargs.items():
        setattr(session, key, value)
    
    # Add base URL handling
    original_request = session.request
    def request_with_base_url(method, url, **kwargs):
        if not url.startswith(('http://', 'https://')):
            url = f"{base_url.rstrip('/')}/{url.lstrip('/')}"
        return original_request(method, url, **kwargs)
    
    session.request = request_with_base_url
    
    try:
        yield session
    finally:
        session.close()

# Usage
with http_session("https://api.example.com", timeout=15) as session:
    response = session.get("/users/123")  # Automatically becomes https://api.example.com/users/123
    user_data = response.json()
    
    # Make multiple requests with the same session
    posts_response = session.get("/users/123/posts")
    posts = posts_response.json()
# Session automatically closed

File Backup Context Manager

This context manager creates a backup of a file before modifying it and can restore the backup if something goes wrong:

import shutil
import os
from contextlib import contextmanager

@contextmanager
def safe_file_modification(filepath):
    """Safely modify a file with automatic backup and restoration."""
    backup_path = f"{filepath}.backup"
    backup_created = False
    
    try:
        # Create backup if file exists
        if os.path.exists(filepath):
            shutil.copy2(filepath, backup_path)
            backup_created = True
        
        yield filepath
        
        # If we reach here, modification was successful
        # Remove backup
        if backup_created and os.path.exists(backup_path):
            os.remove(backup_path)
            
    except Exception:
        # Something went wrong, restore backup
        if backup_created and os.path.exists(backup_path):
            shutil.move(backup_path, filepath)
        raise

# Usage
with safe_file_modification("important_config.txt") as filepath:
    with open(filepath, 'w') as f:
        f.write("new configuration data")
        # If this raises an exception, the original file is restored

Advanced Context Manager Patterns and Techniques

As you become more comfortable with context managers, you'll discover advanced patterns that can solve complex resource management challenges.

Nested Context Managers with ExitStack

Sometimes you need to manage multiple resources dynamically. The ExitStack class from contextlib allows you to manage an arbitrary number of context managers:

from contextlib import ExitStack
import os

def process_multiple_files(file_paths):
    """Process multiple files, ensuring all are properly closed."""
    with ExitStack() as stack:
        # Dynamically open multiple files
        files = []
        for path in file_paths:
            file_obj = stack.enter_context(open(path, 'r'))
            files.append(file_obj)
        
        # Process all files
        for i, file_obj in enumerate(files):
            print(f"Processing {file_paths[i]}: {len(file_obj.read())} characters")
            file_obj.seek(0)  # Reset for actual processing
        
        # All files automatically closed when exiting

# Usage
file_list = ["file1.txt", "file2.txt", "file3.txt"]
process_multiple_files(file_list)

Reentrant Context Managers

Some context managers need to be reentrant, meaning they can be entered multiple times safely:

import threading
from contextlib import contextmanager

class ReentrantTimer:
    def __init__(self):
        self.lock = threading.RLock()  # Reentrant lock
        self.depth = 0
        self.start_time = None
    
    def __enter__(self):
        with self.lock:
            if self.depth == 0:
                self.start_time = time.perf_counter()
                print("Timer started")
            self.depth += 1
            return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        with self.lock:
            self.depth -= 1
            if self.depth == 0:
                elapsed = time.perf_counter() - self.start_time
                print(f"Timer stopped: {elapsed:.3f} seconds")
        return False

timer = ReentrantTimer()

def outer_function():
    with timer:
        print("In outer function")
        inner_function()

def inner_function():
    with timer:  # Same timer, nested usage
        print("In inner function")
        time.sleep(0.1)

outer_function()  # Timer only starts/stops once

Context Manager for API Rate Limiting

Here's a sophisticated example that implements rate limiting for API calls:

import time
import threading
from collections import deque
from contextlib import contextmanager

class RateLimiter:
    def __init__(self, max_calls: int, time_window: float):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = deque()
        self.lock = threading.Lock()
    
    @contextmanager
    def limit(self):
        """Context manager that enforces rate limiting."""
        with self.lock:
            now = time.time()
            
            # Remove old calls outside the time window
            while self.calls and now - self.calls[0] > self.time_window:
                self.calls.popleft()
            
            # If we're at the limit, wait
            if len(self.calls) >= self.max_calls:
                sleep_time = self.time_window - (now - self.calls[0])
                if sleep_time > 0:
                    print(f"Rate limit reached, waiting {sleep_time:.2f} seconds")
                    time.sleep(sleep_time)
                    # Clean up expired calls after sleeping
                    now = time.time()
                    while self.calls and now - self.calls[0] > self.time_window:
                        self.calls.popleft()
            
            # Record this call
            self.calls.append(now)
        
        yield

# Usage
api_limiter = RateLimiter(max_calls=5, time_window=60)  # 5 calls per minute

def make_api_call(endpoint):
    with api_limiter.limit():
        # Simulate API call
        print(f"Calling {endpoint} at {time.strftime('%H:%M:%S')}")
        return f"Response from {endpoint}"

# Make multiple API calls - they'll be automatically rate limited
for i in range(10):
    result = make_api_call(f"/api/endpoint/{i}")

Real-World Context Manager Applications

Let's explore some practical applications that demonstrate the power of context managers in real-world scenarios.

Database Connection Pooling

In production applications, managing database connections efficiently is crucial for performance:

import sqlite3
import threading
import time
from queue import Queue, Empty
from contextlib import contextmanager

class ConnectionPool:
    def __init__(self, database_url: str, max_connections: int = 5):
        self.database_url = database_url
        self.max_connections = max_connections
        self.pool = Queue(maxsize=max_connections)
        self.active_connections = 0
        self.lock = threading.Lock()
        
        # Pre-populate the pool
        for _ in range(max_connections):
            conn = sqlite3.connect(database_url, check_same_thread=False)
            self.pool.put(conn)
    
    @contextmanager
    def get_connection(self, timeout: int = 30):
        """Get a connection from the pool with automatic return."""
        connection = None
        try:
            connection = self.pool.get(timeout=timeout)
            yield connection
        except Empty:
            raise RuntimeError(f"Could not get connection within {timeout} seconds")
        finally:
            if connection:
                self.pool.put(connection)

# Usage
pool = ConnectionPool("app.db", max_connections=3)

def worker_function(worker_id: int):
    for i in range(5):
        with pool.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT COUNT(*) FROM users")
            count = cursor.fetchone()[0]
            print(f"Worker {worker_id}: Found {count} users")
            time.sleep(0.1)  # Simulate work

# Run multiple workers concurrently
import threading
threads = []
for i in range(5):
    thread = threading.Thread(target=worker_function, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

Configuration Environment Manager

Managing different configurations for development, testing, and production environments:

import os
import json
from contextlib import contextmanager
from typing import Dict, Any

class ConfigManager:
    def __init__(self, config_file: str):
        self.config_file = config_file
        self.environments = self._load_config()
        self.current_env = None
    
    def _load_config(self) -> Dict[str, Dict[str, Any]]:
        with open(self.config_file, 'r') as f:
            return json.load(f)
    
    @contextmanager
    def environment(self, env_name: str):
        """Temporarily switch to a specific environment configuration."""
        if env_name not in self.environments:
            raise ValueError(f"Environment '{env_name}' not found in configuration")
        
        # Store original environment variables
        original_vars = {}
        env_config = self.environments[env_name]
        
        try:
            # Set environment variables
            for key, value in env_config.items():
                if key in os.environ:
                    original_vars[key] = os.environ[key]
                os.environ[key] = str(value)
            
            self.current_env = env_name
            yield env_config
            
        finally:
            # Restore original environment variables
            for key in env_config.keys():
                if key in original_vars:
                    os.environ[key] = original_vars[key]
                else:
                    os.environ.pop(key, None)
            
            self.current_env = None

# Example configuration file (config.json):
"""
{
  "development": {
    "DATABASE_URL": "sqlite:///dev.db",
    "DEBUG": "true",
    "LOG_LEVEL": "DEBUG"
  },
  "testing": {
    "DATABASE_URL": "sqlite:///test.db",
    "DEBUG": "false",
    "LOG_LEVEL": "WARNING"
  },
  "production": {
    "DATABASE_URL": "postgresql://prod-server/db",
    "DEBUG": "false",
    "LOG_LEVEL": "ERROR"
  }
}
"""

# Usage
config_manager = ConfigManager("config.json")

def run_tests():
    with config_manager.environment("testing"):
        print(f"Running tests with DATABASE_URL: {os.environ['DATABASE_URL']}")
        print(f"Debug mode: {os.environ['DEBUG']}")
        # Run test suite

def deploy_to_production():
    with config_manager.environment("production"):
        print(f"Deploying with DATABASE_URL: {os.environ['DATABASE_URL']}")
        # Deployment logic

run_tests()
deploy_to_production()

Logging Context Manager

A context manager that automatically logs entry and exit from code blocks with execution time:

import logging
import time
from contextlib import contextmanager
from typing import Optional

# Set up logging
logging.basicConfig(level=logging.INFO, 
                   format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

@contextmanager
def logged_operation(operation_name: str, 
                    logger: Optional[logging.Logger] = None,
                    log_level: int = logging.INFO):
    """Context manager that logs operation start, end, and duration."""
    if logger is None:
        logger = logging.getLogger(__name__)
    
    start_time = time.perf_counter()
    logger.log(log_level, f"Starting operation: {operation_name}")
    
    try:
        yield logger
        end_time = time.perf_counter()
        duration = end_time - start_time
        logger.log(log_level, f"Operation '{operation_name}' completed successfully in {duration:.3f}s")
        
    except Exception as e:
        end_time = time.perf_counter()
        duration = end_time - start_time
        logger.error(f"Operation '{operation_name}' failed after {duration:.3f}s: {str(e)}")
        raise

# Usage
logger = logging.getLogger("my_app")

with logged_operation("Data processing", logger):
    # Simulate some work
    time.sleep(1)
    data = [i**2 for i in range(1000)]

with logged_operation("File upload", logger, logging.DEBUG):
    # This will be logged at DEBUG level
    time.sleep(0.5)

Performance Optimization and Best Practices

Understanding the performance implications of context managers and following best practices will help you write efficient, maintainable code.

Measuring Context Manager Overhead

Context managers do introduce a small amount of overhead, but it's usually negligible compared to the operations they're managing:

import time
from contextlib import contextmanager

# Minimal context manager for testing overhead
@contextmanager
def minimal_context():
    yield

def measure_overhead():
    iterations = 1000000
    
    # Test without context manager
    start = time.perf_counter()
    for _ in range(iterations):
        pass
    baseline = time.perf_counter() - start
    
    # Test with context manager
    start = time.perf_counter()
    for _ in range(iterations):
        with minimal_context():
            pass
    context_time = time.perf_counter() - start
    
    overhead = context_time - baseline
    print(f"Baseline: {baseline:.6f}s")
    print(f"With context manager: {context_time:.6f}s")
    print(f"Overhead: {overhead:.6f}s ({overhead/baseline*100:.2f}% increase)")
    print(f"Overhead per operation: {overhead/iterations*1000000:.2f} microseconds")

measure_overhead()

Memory Management Best Practices

When creating context managers that handle large resources, memory management becomes crucial:

import weakref
from contextlib import contextmanager
from typing import Dict, Any

class ResourceManager:
    """Memory-efficient resource manager using weak references."""
    
    def __init__(self):
        self._resources: Dict[str, Any] = {}
        self._cleanup_callbacks: Dict[str, callable] = {}
    
    @contextmanager
    def managed_resource(self, resource_id: str, resource_factory: callable, 
                        cleanup_callback: callable = None):
        """Manage a resource with automatic cleanup and memory efficiency."""
        
        # Check if resource already exists
        if resource_id in self._resources:
            resource = self._resources[resource_id]
            if resource is not None:  # Weak reference still valid
                yield resource
                return
        
        # Create new resource
        resource = resource_factory()
        
        try:
            # Store weak reference to avoid memory leaks
            self._resources[resource_id] = weakref.ref(resource)
            if cleanup_callback:
                self._cleanup_callbacks[resource_id] = cleanup_callback
            
            yield resource
            
        finally:
            # Cleanup
            if cleanup_callback:
                cleanup_callback(resource)
            
            # Remove from tracking
            self._resources.pop(resource_id, None)
            self._cleanup_callbacks.pop(resource_id, None)

# Usage
resource_manager = ResourceManager()

def create_large_dataset():
    """Factory function for creating expensive resources."""
    return list(range(1000000))  # Simulate large dataset

def cleanup_dataset(dataset):
    """Cleanup function."""
    print(f"Cleaning up dataset with {len(dataset)} items")
    dataset.clear()

with resource_manager.managed_resource(
    "large_dataset", 
    create_large_dataset, 
    cleanup_dataset
) as dataset:
    # Use the dataset
    result = sum(dataset[:1000])
    print(f"Sum of first 1000 items: {result}")
# Dataset automatically cleaned up

Thread-Safe Context Managers

When working with multi-threaded applications, ensuring thread safety is crucial:

import threading
import time
from contextlib import contextmanager
from typing import Dict, Any

class ThreadSafeResourcePool:
    """Thread-safe resource pool implementation."""
    
    def __init__(self, resource_factory: callable, max_resources: int = 10):
        self.resource_factory = resource_factory
        self.max_resources = max_resources
        self.available_resources = []
        self.in_use_resources = set()
        self.lock = threading.RLock()
        self.condition = threading.Condition(self.lock)
    
    @contextmanager
    def get_resource(self, timeout: float = None):
        """Get a resource from the pool in a thread-safe manner."""
        resource = None
        
        with self.condition:
            # Wait for available resource or ability to create new one
            start_time = time.time()
            while (not self.available_resources and 
                   len(self.in_use_resources) >= self.max_resources):
                
                if timeout:
                    elapsed = time.time() - start_time
                    remaining = timeout - elapsed
                    if remaining <= 0:
                        raise TimeoutError("Could not acquire resource within timeout")
                    self.condition.wait(remaining)
                else:
                    self.condition.wait()
            
            # Get or create resource
            if self.available_resources:
                resource = self.available_resources.pop()
            else:
                resource = self.resource_factory()
            
            self.in_use_resources.add(resource)
        
        try:
            yield resource
        finally:
            with self.condition:
                self.in_use_resources.discard(resource)
                self.available_resources.append(resource)
                self.condition.notify()

# Usage example
def create_connection():
    """Simulate expensive resource creation."""
    time.sleep(0.1)  # Simulate connection time
    return f"Connection-{threading.current_thread().ident}"

pool = ThreadSafeResourcePool(create_connection, max_resources=3)

def worker(worker_id: int):
    """Worker function that uses resources from the pool."""
    for i in range(3):
        with pool.get_resource(timeout=5.0) as conn:
            print(f"Worker {worker_id} using {conn}")
            time.sleep(0.2)  # Simulate work

# Run multiple workers
threads = []
for i in range(5):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

Integration with Async/Await

Modern Python applications often use async/await patterns. Context managers can be adapted for asynchronous code:

import asyncio
import aiohttp
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_http_session(base_url: str, timeout: int = 30):
    """Async context manager for HTTP sessions."""
    timeout_config = aiohttp.ClientTimeout(total=timeout)
    
    async with aiohttp.ClientSession(
        base_url=base_url, 
        timeout=timeout_config
    ) as session:
        yield session

@asynccontextmanager
async def async_database_transaction(connection):
    """Async context manager for database transactions."""
    async with connection.begin() as transaction:
        try:
            yield transaction
        except Exception:
            await transaction.rollback()
            raise
        else:
            await transaction.commit()

# Usage in async functions
async def fetch_user_data(user_id: int):
    async with async_http_session("https://api.example.com") as session:
        async with session.get(f"/users/{user_id}") as response:
            if response.status == 200:
                return await response.json()
            else:
                raise ValueError(f"User {user_id} not found")

# Run async code
async def main():
    user_data = await fetch_user_data(123)
    print(f"User data: {user_data}")

# asyncio.run(main())  # Uncomment to run

Conclusion

Mastering Python's with statement and context managers is essential for writing robust, maintainable code. We've covered everything from basic file handling to creating sophisticated custom context managers that can handle complex resource management scenarios.

The key takeaway? Always use context managers when dealing with resources that need cleanup. Your future self (and your application's users) will thank you for the extra reliability and cleaner code structure. Whether you're managing files, database connections, network resources, or temporary state changes, context managers provide an elegant and Pythonic solution.

Context managers embody one of Python's core principles: making the right thing easy to do. They automatically handle resource cleanup, provide clear code structure, and make exception handling more robust. As you continue developing Python applications, you'll find that context managers become an indispensable tool in your programming toolkit.

Ready to implement context managers in your next Python project? Start with simple file operations and gradually work your way up to custom implementations. Remember, good resource management is the foundation of professional Python development. The investment in learning these patterns will pay dividends in code quality, reliability, and maintainability for years to come!

Share this article

Add Comment

No comments yet. Be the first to comment!

More from Python