Table Of Contents
- Big Data, Smart Processing
- Understanding the Memory Challenge
- Basic Chunking with read_csv()
- Advanced Chunking Strategies
- Memory-Optimized Data Types
- Parallel Processing with Chunks
- Streaming Data Processing
- Best Practices and Performance Tips
- Master Big Data Processing
Big Data, Smart Processing
When datasets exceed your system's memory, traditional loading methods crash. Master pandas' chunking techniques to process massive files efficiently without overwhelming your hardware.
Understanding the Memory Challenge
import pandas as pd
import numpy as np
import psutil
import os
def get_memory_usage():
"""Get current memory usage"""
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return {
'rss_mb': memory_info.rss / 1024 / 1024, # Resident Set Size
'vms_mb': memory_info.vms / 1024 / 1024 # Virtual Memory Size
}
# Create a sample large dataset to demonstrate concepts
def create_sample_large_file(filename, num_rows=1000000):
"""Create a large CSV file for demonstration"""
print(f"Creating sample file with {num_rows:,} rows...")
# Generate data in chunks to avoid memory issues
chunk_size = 100000
header_written = False
for start in range(0, num_rows, chunk_size):
end = min(start + chunk_size, num_rows)
current_chunk_size = end - start
chunk_data = pd.DataFrame({
'user_id': np.random.randint(1, 100000, current_chunk_size),
'transaction_date': pd.date_range('2020-01-01', periods=current_chunk_size, freq='1H'),
'amount': np.random.uniform(10, 1000, current_chunk_size),
'category': np.random.choice(['Food', 'Transport', 'Entertainment', 'Shopping', 'Bills'],
current_chunk_size),
'merchant': np.random.choice([f'Merchant_{i}' for i in range(1, 1001)], current_chunk_size)
})
# Write to file
mode = 'w' if not header_written else 'a'
chunk_data.to_csv(filename, mode=mode, header=not header_written, index=False)
header_written = True
print(f" Written {end:,} rows...")
print(f"Sample file created: {filename}")
# Create sample file (comment out if file already exists)
# create_sample_large_file('large_dataset.csv', 1000000)
print("Initial memory usage:")
print(get_memory_usage())
Basic Chunking with read_csv()
# Basic chunking approach
def process_large_csv_basic(filename, chunk_size=10000):
"""Process large CSV file in chunks"""
print(f"Processing {filename} in chunks of {chunk_size:,} rows")
chunk_results = []
memory_usage = []
# Process file in chunks
for i, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
print(f"Processing chunk {i+1}: {len(chunk):,} rows")
# Example processing: calculate statistics for each chunk
chunk_stats = {
'chunk_number': i + 1,
'rows': len(chunk),
'avg_amount': chunk['amount'].mean(),
'total_amount': chunk['amount'].sum(),
'unique_users': chunk['user_id'].nunique(),
'date_range': f"{chunk['transaction_date'].min()} to {chunk['transaction_date'].max()}"
}
chunk_results.append(chunk_stats)
memory_usage.append(get_memory_usage()['rss_mb'])
# Optional: Clear chunk from memory explicitly
del chunk
if i >= 4: # Process only first 5 chunks for demo
break
# Combine results
results_df = pd.DataFrame(chunk_results)
print("\nChunk processing results:")
print(results_df)
print(f"\nMemory usage range: {min(memory_usage):.1f} - {max(memory_usage):.1f} MB")
return results_df
# Example usage
# chunk_results = process_large_csv_basic('large_dataset.csv')
Advanced Chunking Strategies
class LargeDatasetProcessor:
"""Advanced processor for large datasets"""
def __init__(self, filename, chunk_size=50000):
self.filename = filename
self.chunk_size = chunk_size
self.total_rows = None
self.columns = None
def get_file_info(self):
"""Get basic file information without loading all data"""
# Read just the first chunk to get column info
first_chunk = pd.read_csv(self.filename, nrows=1000)
self.columns = first_chunk.columns.tolist()
# Count total rows (memory efficient)
total_rows = 0
for chunk in pd.read_csv(self.filename, chunksize=self.chunk_size, usecols=[0]):
total_rows += len(chunk)
self.total_rows = total_rows
return {
'total_rows': self.total_rows,
'columns': self.columns,
'estimated_chunks': (self.total_rows // self.chunk_size) + 1
}
def filter_and_aggregate(self, filter_conditions=None, group_by=None, agg_func=None):
"""Filter data and perform aggregations in chunks"""
results = []
processed_rows = 0
for chunk_num, chunk in enumerate(pd.read_csv(self.filename, chunksize=self.chunk_size)):
# Apply filters if specified
if filter_conditions:
for column, condition in filter_conditions.items():
if condition['operator'] == '>':
chunk = chunk[chunk[column] > condition['value']]
elif condition['operator'] == '==':
chunk = chunk[chunk[column] == condition['value']]
elif condition['operator'] == 'isin':
chunk = chunk[chunk[column].isin(condition['value'])]
# Skip empty chunks after filtering
if len(chunk) == 0:
continue
# Perform aggregation if specified
if group_by and agg_func:
chunk_result = chunk.groupby(group_by).agg(agg_func).reset_index()
results.append(chunk_result)
else:
results.append(chunk)
processed_rows += len(chunk)
if chunk_num % 10 == 0:
print(f"Processed chunk {chunk_num + 1}, rows so far: {processed_rows:,}")
# Combine results
if results:
final_result = pd.concat(results, ignore_index=True)
# Final aggregation if needed
if group_by and agg_func:
final_result = final_result.groupby(group_by).agg(agg_func).reset_index()
return final_result
else:
return pd.DataFrame()
def streaming_statistics(self):
"""Calculate statistics across the entire dataset efficiently"""
# Initialize accumulators
total_rows = 0
sum_amount = 0
sum_squared_amount = 0
min_amount = float('inf')
max_amount = float('-inf')
unique_users = set()
category_counts = {}
for chunk in pd.read_csv(self.filename, chunksize=self.chunk_size):
# Update basic statistics
total_rows += len(chunk)
sum_amount += chunk['amount'].sum()
sum_squared_amount += (chunk['amount'] ** 2).sum()
min_amount = min(min_amount, chunk['amount'].min())
max_amount = max(max_amount, chunk['amount'].max())
# Update unique users (memory efficient for reasonable cardinality)
unique_users.update(chunk['user_id'].unique())
# Update category counts
chunk_categories = chunk['category'].value_counts()
for category, count in chunk_categories.items():
category_counts[category] = category_counts.get(category, 0) + count
# Calculate final statistics
mean_amount = sum_amount / total_rows
variance_amount = (sum_squared_amount / total_rows) - (mean_amount ** 2)
std_amount = np.sqrt(variance_amount)
return {
'total_rows': total_rows,
'unique_users': len(unique_users),
'amount_statistics': {
'mean': mean_amount,
'std': std_amount,
'min': min_amount,
'max': max_amount,
'total': sum_amount
},
'category_distribution': category_counts
}
# Example usage
# processor = LargeDatasetProcessor('large_dataset.csv', chunk_size=100000)
# file_info = processor.get_file_info()
# print("File information:", file_info)
# statistics = processor.streaming_statistics()
# print("Dataset statistics:", statistics)
Memory-Optimized Data Types
def optimize_chunk_dtypes(chunk):
"""Optimize data types for memory efficiency"""
optimized_chunk = chunk.copy()
# Optimize integer columns
for col in optimized_chunk.select_dtypes(include=['int64']).columns:
col_min = optimized_chunk[col].min()
col_max = optimized_chunk[col].max()
if col_min >= 0: # Unsigned integers
if col_max <= 255:
optimized_chunk[col] = optimized_chunk[col].astype('uint8')
elif col_max <= 65535:
optimized_chunk[col] = optimized_chunk[col].astype('uint16')
elif col_max <= 4294967295:
optimized_chunk[col] = optimized_chunk[col].astype('uint32')
else: # Signed integers
if col_min >= -128 and col_max <= 127:
optimized_chunk[col] = optimized_chunk[col].astype('int8')
elif col_min >= -32768 and col_max <= 32767:
optimized_chunk[col] = optimized_chunk[col].astype('int16')
elif col_min >= -2147483648 and col_max <= 2147483647:
optimized_chunk[col] = optimized_chunk[col].astype('int32')
# Optimize float columns
for col in optimized_chunk.select_dtypes(include=['float64']).columns:
optimized_chunk[col] = pd.to_numeric(optimized_chunk[col], downcast='float')
# Convert repeated strings to categories
for col in optimized_chunk.select_dtypes(include=['object']).columns:
if optimized_chunk[col].nunique() < len(optimized_chunk) * 0.5:
optimized_chunk[col] = optimized_chunk[col].astype('category')
return optimized_chunk
def process_with_optimization(filename, chunk_size=50000):
"""Process file with memory optimization"""
memory_savings = []
for i, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
# Memory usage before optimization
memory_before = chunk.memory_usage(deep=True).sum()
# Optimize chunk
optimized_chunk = optimize_chunk_dtypes(chunk)
# Memory usage after optimization
memory_after = optimized_chunk.memory_usage(deep=True).sum()
savings = ((memory_before - memory_after) / memory_before) * 100
memory_savings.append(savings)
print(f"Chunk {i+1}: {savings:.1f}% memory saved")
# Process optimized chunk here
# ... your processing logic ...
if i >= 4: # Demo first 5 chunks
break
print(f"Average memory savings: {np.mean(memory_savings):.1f}%")
# Example usage
# process_with_optimization('large_dataset.csv')
Parallel Processing with Chunks
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing
def process_chunk(chunk_data):
"""Process a single chunk (CPU-intensive task)"""
# Example: Complex calculation on chunk
chunk_data['amount_squared'] = chunk_data['amount'] ** 2
chunk_data['user_transaction_rank'] = chunk_data.groupby('user_id')['amount'].rank()
# Return summary statistics
return {
'chunk_size': len(chunk_data),
'total_amount': chunk_data['amount'].sum(),
'avg_amount': chunk_data['amount'].mean(),
'unique_users': chunk_data['user_id'].nunique(),
'max_rank': chunk_data['user_transaction_rank'].max()
}
def parallel_chunk_processing(filename, chunk_size=50000, max_workers=None):
"""Process chunks in parallel"""
if max_workers is None:
max_workers = multiprocessing.cpu_count() - 1
print(f"Processing with {max_workers} parallel workers")
# Read chunks into memory first (for small to medium datasets)
chunks = []
for i, chunk in enumerate(pd.read_csv(filename, chunksize=chunk_size)):
chunks.append(chunk)
if i >= 9: # Limit for demo
break
print(f"Loaded {len(chunks)} chunks for parallel processing")
# Process chunks in parallel
with ProcessPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_chunk, chunks))
# Combine results
results_df = pd.DataFrame(results)
print("\nParallel processing results:")
print(results_df)
return results_df
# Example usage (uncomment to run)
# parallel_results = parallel_chunk_processing('large_dataset.csv')
Streaming Data Processing
class StreamingProcessor:
"""Process data as a continuous stream"""
def __init__(self, filename, chunk_size=10000):
self.filename = filename
self.chunk_size = chunk_size
self.processed_count = 0
self.running_stats = {
'count': 0,
'sum': 0,
'sum_squared': 0,
'min': float('inf'),
'max': float('-inf')
}
def update_running_stats(self, chunk):
"""Update running statistics with new chunk"""
amount_data = chunk['amount']
self.running_stats['count'] += len(amount_data)
self.running_stats['sum'] += amount_data.sum()
self.running_stats['sum_squared'] += (amount_data ** 2).sum()
self.running_stats['min'] = min(self.running_stats['min'], amount_data.min())
self.running_stats['max'] = max(self.running_stats['max'], amount_data.max())
def get_current_stats(self):
"""Get current statistics"""
if self.running_stats['count'] == 0:
return None
mean = self.running_stats['sum'] / self.running_stats['count']
variance = (self.running_stats['sum_squared'] / self.running_stats['count']) - (mean ** 2)
std = np.sqrt(variance) if variance > 0 else 0
return {
'count': self.running_stats['count'],
'mean': mean,
'std': std,
'min': self.running_stats['min'],
'max': self.running_stats['max']
}
def stream_process(self, callback=None):
"""Stream process the file"""
for chunk_num, chunk in enumerate(pd.read_csv(self.filename, chunksize=self.chunk_size)):
# Update statistics
self.update_running_stats(chunk)
self.processed_count += len(chunk)
# Optional callback for custom processing
if callback:
callback(chunk, chunk_num)
# Print progress
if chunk_num % 10 == 0:
current_stats = self.get_current_stats()
print(f"Processed {self.processed_count:,} rows. Current mean: {current_stats['mean']:.2f}")
return self.get_current_stats()
# Custom callback function
def custom_chunk_callback(chunk, chunk_num):
"""Custom processing for each chunk"""
# Example: Alert for high-value transactions
high_value = chunk[chunk['amount'] > 500]
if len(high_value) > 0:
print(f" Chunk {chunk_num}: Found {len(high_value)} high-value transactions")
# Example usage
# streaming_processor = StreamingProcessor('large_dataset.csv', chunk_size=50000)
# final_stats = streaming_processor.stream_process(callback=custom_chunk_callback)
# print("Final statistics:", final_stats)
Best Practices and Performance Tips
def chunking_best_practices():
"""Best practices for chunking large datasets"""
practices = {
'Chunk Size Selection': [
"Start with 10,000-100,000 rows per chunk",
"Adjust based on available RAM and processing complexity",
"Smaller chunks for complex operations, larger for simple ones"
],
'Memory Management': [
"Use appropriate data types (uint8 vs int64)",
"Convert strings to categories when appropriate",
"Use del or gc.collect() to free memory when needed",
"Monitor memory usage throughout processing"
],
'Processing Efficiency': [
"Process chunks in parallel when operations are independent",
"Use vectorized operations within chunks",
"Avoid growing lists - preallocate when possible",
"Consider using generators for very large datasets"
],
'Error Handling': [
"Implement robust error handling for individual chunks",
"Log progress to enable resuming from failures",
"Validate data types and ranges in each chunk",
"Handle missing or corrupted data gracefully"
]
}
print("=== CHUNKING BEST PRACTICES ===")
for category, tips in practices.items():
print(f"\n{category}:")
for tip in tips:
print(f" ✅ {tip}")
chunking_best_practices()
# Performance monitoring
def monitor_chunking_performance(filename, chunk_sizes=[10000, 50000, 100000]):
"""Compare performance across different chunk sizes"""
import time
performance_results = []
for chunk_size in chunk_sizes:
print(f"\nTesting chunk size: {chunk_size:,}")
start_time = time.time()
start_memory = get_memory_usage()['rss_mb']
# Simple processing test
total_rows = 0
for chunk in pd.read_csv(filename, chunksize=chunk_size):
total_rows += len(chunk)
# Simple operation
_ = chunk['amount'].mean()
if total_rows >= 500000: # Limit for demo
break
end_time = time.time()
end_memory = get_memory_usage()['rss_mb']
performance_results.append({
'chunk_size': chunk_size,
'processing_time': end_time - start_time,
'memory_used': end_memory - start_memory,
'rows_processed': total_rows
})
performance_df = pd.DataFrame(performance_results)
print("\nPerformance comparison:")
print(performance_df)
return performance_df
# Example usage (uncomment to run)
# performance_comparison = monitor_chunking_performance('large_dataset.csv')
Master Big Data Processing
Explore distributed computing with Dask, learn memory profiling techniques, and discover scalable data pipeline architectures.
Share this article
Add Comment
No comments yet. Be the first to comment!