Navigation

Python

How to Handle Large Datasets with Chunking

Conquer memory limitations and process massive datasets efficiently using pandas' chunking strategies - your solution for big data on limited resources.

Table Of Contents

Big Data, Smart Processing

When datasets exceed your system's memory, traditional loading methods crash. Master pandas' chunking techniques to process massive files efficiently without overwhelming your hardware.

Understanding the Memory Challenge

import pandas as pd
import numpy as np
import psutil
import os

def get_memory_usage():
    """Get current memory usage"""
    process = psutil.Process(os.getpid())
    memory_info = process.memory_info()
    return {
        'rss_mb': memory_info.rss / 1024 / 1024,  # Resident Set Size
        'vms_mb': memory_info.vms / 1024 / 1024   # Virtual Memory Size
    }

# Create a sample large dataset to demonstrate concepts
def create_sample_large_file(filename, num_rows=1000000):
    """Create a large CSV file for demonstration"""
    
    print(f"Creating sample file with {num_rows:,} rows...")
    
    # Generate data in chunks to avoid memory issues
    chunk_size = 100000
    header_written = False
    
    for start in range(0, num_rows, chunk_size):
        end = min(start + chunk_size, num_rows)
        current_chunk_size = end - start
        
        chunk_data = pd.DataFrame({
            'user_id': np.random.randint(1, 100000, current_chunk_size),
            'transaction_date': pd.date_range('2020-01-01', periods=current_chunk_size, freq='1H'),
            'amount': np.random.uniform(10, 1000, current_chunk_size),
            'category': np.random.choice(['Food', 'Transport', 'Entertainment', 'Shopping', 'Bills'], 
                                       current_chunk_size),
            'merchant': np.random.choice([f'Merchant_{i}' for i in range(1, 1001)], current_chunk_size)
        })
        
        # Write to file
        mode = 'w' if not header_written else 'a'
        chunk_data.to_csv(filename, mode=mode, header=not header_written, index=False)
        header_written = True
        
        print(f"  Written {end:,} rows...")
        
    print(f"Sample file created: {filename}")

# Create sample file (comment out if file already exists)
# create_sample_large_file('large_dataset.csv', 1000000)

print("Initial memory usage:")
print(get_memory_usage())

Basic Chunking with read_csv()

# Basic chunking approach
def process_large_csv_basic(filename, chunk_size=10000):
    """Process large CSV file in chunks"""
    
    print(f"Processing {filename} in chunks of {chunk_size:,} rows")
    
    chunk_results = []
    memory_usage = []
    
    # Process file in chunks
    for i, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
        print(f"Processing chunk {i+1}: {len(chunk):,} rows")
        
        # Example processing: calculate statistics for each chunk
        chunk_stats = {
            'chunk_number': i + 1,
            'rows': len(chunk),
            'avg_amount': chunk['amount'].mean(),
            'total_amount': chunk['amount'].sum(),
            'unique_users': chunk['user_id'].nunique(),
            'date_range': f"{chunk['transaction_date'].min()} to {chunk['transaction_date'].max()}"
        }
        
        chunk_results.append(chunk_stats)
        memory_usage.append(get_memory_usage()['rss_mb'])
        
        # Optional: Clear chunk from memory explicitly
        del chunk
        
        if i >= 4:  # Process only first 5 chunks for demo
            break
    
    # Combine results
    results_df = pd.DataFrame(chunk_results)
    print("\nChunk processing results:")
    print(results_df)
    
    print(f"\nMemory usage range: {min(memory_usage):.1f} - {max(memory_usage):.1f} MB")
    
    return results_df

# Example usage
# chunk_results = process_large_csv_basic('large_dataset.csv')

Advanced Chunking Strategies

class LargeDatasetProcessor:
    """Advanced processor for large datasets"""
    
    def __init__(self, filename, chunk_size=50000):
        self.filename = filename
        self.chunk_size = chunk_size
        self.total_rows = None
        self.columns = None
        
    def get_file_info(self):
        """Get basic file information without loading all data"""
        
        # Read just the first chunk to get column info
        first_chunk = pd.read_csv(self.filename, nrows=1000)
        self.columns = first_chunk.columns.tolist()
        
        # Count total rows (memory efficient)
        total_rows = 0
        for chunk in pd.read_csv(self.filename, chunksize=self.chunk_size, usecols=[0]):
            total_rows += len(chunk)
        
        self.total_rows = total_rows
        
        return {
            'total_rows': self.total_rows,
            'columns': self.columns,
            'estimated_chunks': (self.total_rows // self.chunk_size) + 1
        }
    
    def filter_and_aggregate(self, filter_conditions=None, group_by=None, agg_func=None):
        """Filter data and perform aggregations in chunks"""
        
        results = []
        processed_rows = 0
        
        for chunk_num, chunk in enumerate(pd.read_csv(self.filename, chunksize=self.chunk_size)):
            
            # Apply filters if specified
            if filter_conditions:
                for column, condition in filter_conditions.items():
                    if condition['operator'] == '>':
                        chunk = chunk[chunk[column] > condition['value']]
                    elif condition['operator'] == '==':
                        chunk = chunk[chunk[column] == condition['value']]
                    elif condition['operator'] == 'isin':
                        chunk = chunk[chunk[column].isin(condition['value'])]
            
            # Skip empty chunks after filtering
            if len(chunk) == 0:
                continue
            
            # Perform aggregation if specified
            if group_by and agg_func:
                chunk_result = chunk.groupby(group_by).agg(agg_func).reset_index()
                results.append(chunk_result)
            else:
                results.append(chunk)
            
            processed_rows += len(chunk)
            
            if chunk_num % 10 == 0:
                print(f"Processed chunk {chunk_num + 1}, rows so far: {processed_rows:,}")
        
        # Combine results
        if results:
            final_result = pd.concat(results, ignore_index=True)
            
            # Final aggregation if needed
            if group_by and agg_func:
                final_result = final_result.groupby(group_by).agg(agg_func).reset_index()
            
            return final_result
        else:
            return pd.DataFrame()
    
    def streaming_statistics(self):
        """Calculate statistics across the entire dataset efficiently"""
        
        # Initialize accumulators
        total_rows = 0
        sum_amount = 0
        sum_squared_amount = 0
        min_amount = float('inf')
        max_amount = float('-inf')
        unique_users = set()
        category_counts = {}
        
        for chunk in pd.read_csv(self.filename, chunksize=self.chunk_size):
            
            # Update basic statistics
            total_rows += len(chunk)
            sum_amount += chunk['amount'].sum()
            sum_squared_amount += (chunk['amount'] ** 2).sum()
            min_amount = min(min_amount, chunk['amount'].min())
            max_amount = max(max_amount, chunk['amount'].max())
            
            # Update unique users (memory efficient for reasonable cardinality)
            unique_users.update(chunk['user_id'].unique())
            
            # Update category counts
            chunk_categories = chunk['category'].value_counts()
            for category, count in chunk_categories.items():
                category_counts[category] = category_counts.get(category, 0) + count
        
        # Calculate final statistics
        mean_amount = sum_amount / total_rows
        variance_amount = (sum_squared_amount / total_rows) - (mean_amount ** 2)
        std_amount = np.sqrt(variance_amount)
        
        return {
            'total_rows': total_rows,
            'unique_users': len(unique_users),
            'amount_statistics': {
                'mean': mean_amount,
                'std': std_amount,
                'min': min_amount,
                'max': max_amount,
                'total': sum_amount
            },
            'category_distribution': category_counts
        }

# Example usage
# processor = LargeDatasetProcessor('large_dataset.csv', chunk_size=100000)
# file_info = processor.get_file_info()
# print("File information:", file_info)

# statistics = processor.streaming_statistics()
# print("Dataset statistics:", statistics)

Memory-Optimized Data Types

def optimize_chunk_dtypes(chunk):
    """Optimize data types for memory efficiency"""
    
    optimized_chunk = chunk.copy()
    
    # Optimize integer columns
    for col in optimized_chunk.select_dtypes(include=['int64']).columns:
        col_min = optimized_chunk[col].min()
        col_max = optimized_chunk[col].max()
        
        if col_min >= 0:  # Unsigned integers
            if col_max <= 255:
                optimized_chunk[col] = optimized_chunk[col].astype('uint8')
            elif col_max <= 65535:
                optimized_chunk[col] = optimized_chunk[col].astype('uint16')
            elif col_max <= 4294967295:
                optimized_chunk[col] = optimized_chunk[col].astype('uint32')
        else:  # Signed integers
            if col_min >= -128 and col_max <= 127:
                optimized_chunk[col] = optimized_chunk[col].astype('int8')
            elif col_min >= -32768 and col_max <= 32767:
                optimized_chunk[col] = optimized_chunk[col].astype('int16')
            elif col_min >= -2147483648 and col_max <= 2147483647:
                optimized_chunk[col] = optimized_chunk[col].astype('int32')
    
    # Optimize float columns
    for col in optimized_chunk.select_dtypes(include=['float64']).columns:
        optimized_chunk[col] = pd.to_numeric(optimized_chunk[col], downcast='float')
    
    # Convert repeated strings to categories
    for col in optimized_chunk.select_dtypes(include=['object']).columns:
        if optimized_chunk[col].nunique() < len(optimized_chunk) * 0.5:
            optimized_chunk[col] = optimized_chunk[col].astype('category')
    
    return optimized_chunk

def process_with_optimization(filename, chunk_size=50000):
    """Process file with memory optimization"""
    
    memory_savings = []
    
    for i, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
        
        # Memory usage before optimization
        memory_before = chunk.memory_usage(deep=True).sum()
        
        # Optimize chunk
        optimized_chunk = optimize_chunk_dtypes(chunk)
        
        # Memory usage after optimization
        memory_after = optimized_chunk.memory_usage(deep=True).sum()
        
        savings = ((memory_before - memory_after) / memory_before) * 100
        memory_savings.append(savings)
        
        print(f"Chunk {i+1}: {savings:.1f}% memory saved")
        
        # Process optimized chunk here
        # ... your processing logic ...
        
        if i >= 4:  # Demo first 5 chunks
            break
    
    print(f"Average memory savings: {np.mean(memory_savings):.1f}%")

# Example usage
# process_with_optimization('large_dataset.csv')

Parallel Processing with Chunks

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing

def process_chunk(chunk_data):
    """Process a single chunk (CPU-intensive task)"""
    
    # Example: Complex calculation on chunk
    chunk_data['amount_squared'] = chunk_data['amount'] ** 2
    chunk_data['user_transaction_rank'] = chunk_data.groupby('user_id')['amount'].rank()
    
    # Return summary statistics
    return {
        'chunk_size': len(chunk_data),
        'total_amount': chunk_data['amount'].sum(),
        'avg_amount': chunk_data['amount'].mean(),
        'unique_users': chunk_data['user_id'].nunique(),
        'max_rank': chunk_data['user_transaction_rank'].max()
    }

def parallel_chunk_processing(filename, chunk_size=50000, max_workers=None):
    """Process chunks in parallel"""
    
    if max_workers is None:
        max_workers = multiprocessing.cpu_count() - 1
    
    print(f"Processing with {max_workers} parallel workers")
    
    # Read chunks into memory first (for small to medium datasets)
    chunks = []
    for i, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
        chunks.append(chunk)
        if i >= 9:  # Limit for demo
            break
    
    print(f"Loaded {len(chunks)} chunks for parallel processing")
    
    # Process chunks in parallel
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(process_chunk, chunks))
    
    # Combine results
    results_df = pd.DataFrame(results)
    print("\nParallel processing results:")
    print(results_df)
    
    return results_df

# Example usage (uncomment to run)
# parallel_results = parallel_chunk_processing('large_dataset.csv')

Streaming Data Processing

class StreamingProcessor:
    """Process data as a continuous stream"""
    
    def __init__(self, filename, chunk_size=10000):
        self.filename = filename
        self.chunk_size = chunk_size
        self.processed_count = 0
        self.running_stats = {
            'count': 0,
            'sum': 0,
            'sum_squared': 0,
            'min': float('inf'),
            'max': float('-inf')
        }
    
    def update_running_stats(self, chunk):
        """Update running statistics with new chunk"""
        
        amount_data = chunk['amount']
        
        self.running_stats['count'] += len(amount_data)
        self.running_stats['sum'] += amount_data.sum()
        self.running_stats['sum_squared'] += (amount_data ** 2).sum()
        self.running_stats['min'] = min(self.running_stats['min'], amount_data.min())
        self.running_stats['max'] = max(self.running_stats['max'], amount_data.max())
    
    def get_current_stats(self):
        """Get current statistics"""
        
        if self.running_stats['count'] == 0:
            return None
        
        mean = self.running_stats['sum'] / self.running_stats['count']
        variance = (self.running_stats['sum_squared'] / self.running_stats['count']) - (mean ** 2)
        std = np.sqrt(variance) if variance > 0 else 0
        
        return {
            'count': self.running_stats['count'],
            'mean': mean,
            'std': std,
            'min': self.running_stats['min'],
            'max': self.running_stats['max']
        }
    
    def stream_process(self, callback=None):
        """Stream process the file"""
        
        for chunk_num, chunk in enumerate(pd.read_csv(self.filename, chunksize=self.chunk_size)):
            
            # Update statistics
            self.update_running_stats(chunk)
            self.processed_count += len(chunk)
            
            # Optional callback for custom processing
            if callback:
                callback(chunk, chunk_num)
            
            # Print progress
            if chunk_num % 10 == 0:
                current_stats = self.get_current_stats()
                print(f"Processed {self.processed_count:,} rows. Current mean: {current_stats['mean']:.2f}")
        
        return self.get_current_stats()

# Custom callback function
def custom_chunk_callback(chunk, chunk_num):
    """Custom processing for each chunk"""
    
    # Example: Alert for high-value transactions
    high_value = chunk[chunk['amount'] > 500]
    if len(high_value) > 0:
        print(f"  Chunk {chunk_num}: Found {len(high_value)} high-value transactions")

# Example usage
# streaming_processor = StreamingProcessor('large_dataset.csv', chunk_size=50000)
# final_stats = streaming_processor.stream_process(callback=custom_chunk_callback)
# print("Final statistics:", final_stats)

Best Practices and Performance Tips

def chunking_best_practices():
    """Best practices for chunking large datasets"""
    
    practices = {
        'Chunk Size Selection': [
            "Start with 10,000-100,000 rows per chunk",
            "Adjust based on available RAM and processing complexity",
            "Smaller chunks for complex operations, larger for simple ones"
        ],
        
        'Memory Management': [
            "Use appropriate data types (uint8 vs int64)",
            "Convert strings to categories when appropriate", 
            "Use del or gc.collect() to free memory when needed",
            "Monitor memory usage throughout processing"
        ],
        
        'Processing Efficiency': [
            "Process chunks in parallel when operations are independent",
            "Use vectorized operations within chunks",
            "Avoid growing lists - preallocate when possible",
            "Consider using generators for very large datasets"
        ],
        
        'Error Handling': [
            "Implement robust error handling for individual chunks",
            "Log progress to enable resuming from failures",
            "Validate data types and ranges in each chunk",
            "Handle missing or corrupted data gracefully"
        ]
    }
    
    print("=== CHUNKING BEST PRACTICES ===")
    for category, tips in practices.items():
        print(f"\n{category}:")
        for tip in tips:
            print(f"  ✅ {tip}")

chunking_best_practices()

# Performance monitoring
def monitor_chunking_performance(filename, chunk_sizes=[10000, 50000, 100000]):
    """Compare performance across different chunk sizes"""
    
    import time
    
    performance_results = []
    
    for chunk_size in chunk_sizes:
        print(f"\nTesting chunk size: {chunk_size:,}")
        
        start_time = time.time()
        start_memory = get_memory_usage()['rss_mb']
        
        # Simple processing test
        total_rows = 0
        for chunk in pd.read_csv(filename, chunksize=chunk_size):
            total_rows += len(chunk)
            # Simple operation
            _ = chunk['amount'].mean()
            
            if total_rows >= 500000:  # Limit for demo
                break
        
        end_time = time.time()
        end_memory = get_memory_usage()['rss_mb']
        
        performance_results.append({
            'chunk_size': chunk_size,
            'processing_time': end_time - start_time,
            'memory_used': end_memory - start_memory,
            'rows_processed': total_rows
        })
    
    performance_df = pd.DataFrame(performance_results)
    print("\nPerformance comparison:")
    print(performance_df)
    
    return performance_df

# Example usage (uncomment to run)
# performance_comparison = monitor_chunking_performance('large_dataset.csv')

Master Big Data Processing

Explore distributed computing with Dask, learn memory profiling techniques, and discover scalable data pipeline architectures.

Share this article

Add Comment

No comments yet. Be the first to comment!

More from Python