Hash Tables: The Secret Weapon Behind Lightning-Fast Lookups
It was my second month at Amazon, and I was tasked with optimizing a user preference system that was taking 30 seconds to load. The existing solution? A massive array that we searched linearly for each user preference. With 50 million users and dozens of preferences each, you can imagine the performance nightmare.
My senior engineer pulled me aside and said, "Maya, meet your new best friend: the hash table. It'll turn your O(n) searches into O(1) lookups." That single optimization reduced our load time from 30 seconds to 200 milliseconds. Since then, hash tables have been my go-to solution for any performance-critical lookup scenario.
Table Of Contents
- The Linear Search Horror Story
- Hash Tables: The O(1) Miracle
- How Hash Tables Actually Work
- Collision Resolution Strategies
- Real-World Applications
- Advanced Hash Table Optimizations
- Final Thoughts: Hash Tables as the Foundation of Modern Computing
The Linear Search Horror Story
Here's what our original code looked like:
# The nightmare: O(n) lookup for everything
class UserPreferencesSlowVersion:
def __init__(self):
self.preferences = [] # List of tuples: (user_id, preference_key, value)
def set_preference(self, user_id, key, value):
# Check if preference already exists
for i, (uid, pkey, pvalue) in enumerate(self.preferences):
if uid == user_id and pkey == key:
self.preferences[i] = (user_id, key, value)
return
# Add new preference
self.preferences.append((user_id, key, value))
def get_preference(self, user_id, key):
# Linear search through all preferences - O(n) time!
for uid, pkey, pvalue in self.preferences:
if uid == user_id and pkey == key:
return pvalue
return None
def get_user_preferences(self, user_id):
# Even worse - O(n) for each preference!
user_prefs = {}
for uid, pkey, pvalue in self.preferences:
if uid == user_id:
user_prefs[pkey] = pvalue
return user_prefs
# Performance test with realistic data
import time
import random
slow_system = UserPreferencesSlowVersion()
# Simulate 100,000 users with 10 preferences each
print("Loading preferences...")
start_time = time.time()
for user_id in range(100000):
for pref_num in range(10):
slow_system.set_preference(
user_id,
f"preference_{pref_num}",
random.choice(['enabled', 'disabled'])
)
load_time = time.time() - start_time
print(f"Load time: {load_time:.2f} seconds")
# Test lookup performance
print("Testing lookup performance...")
start_time = time.time()
for _ in range(1000):
random_user = random.randint(0, 99999)
slow_system.get_preference(random_user, "preference_5")
lookup_time = time.time() - start_time
print(f"1000 lookups took: {lookup_time:.2f} seconds")
This code was a performance disaster. Every lookup required scanning through potentially millions of records. Enter hash tables.
Hash Tables: The O(1) Miracle
Here's how I rewrote the system using hash tables:
# The hash table solution: O(1) average case lookups
class UserPreferencesFastVersion:
def __init__(self):
# Python dict IS a hash table!
self.preferences = {} # user_id -> {preference_key: value}
def set_preference(self, user_id, key, value):
if user_id not in self.preferences:
self.preferences[user_id] = {}
self.preferences[user_id][key] = value
def get_preference(self, user_id, key):
user_prefs = self.preferences.get(user_id, {})
return user_prefs.get(key)
def get_user_preferences(self, user_id):
return self.preferences.get(user_id, {}).copy()
# Performance comparison
fast_system = UserPreferencesFastVersion()
print("Loading preferences with hash table...")
start_time = time.time()
for user_id in range(100000):
for pref_num in range(10):
fast_system.set_preference(
user_id,
f"preference_{pref_num}",
random.choice(['enabled', 'disabled'])
)
load_time = time.time() - start_time
print(f"Load time: {load_time:.2f} seconds")
print("Testing lookup performance...")
start_time = time.time()
for _ in range(1000):
random_user = random.randint(0, 99999)
fast_system.get_preference(random_user, "preference_5")
lookup_time = time.time() - start_time
print(f"1000 lookups took: {lookup_time:.2f} seconds")
The results were dramatic:
- Linear search: 1000 lookups in ~15 seconds
- Hash table: 1000 lookups in ~0.001 seconds
That's a 15,000x performance improvement!
How Hash Tables Actually Work
The Magic of Hash Functions
# Understanding hash functions
def simple_hash(key, table_size):
"""A simple hash function for educational purposes"""
if isinstance(key, str):
# Sum ASCII values of characters
hash_value = sum(ord(char) for char in key)
elif isinstance(key, int):
hash_value = key
else:
hash_value = hash(key)
# Map to table size using modulo
return hash_value % table_size
# Example usage
table_size = 10
print(f"Hash of 'maya': {simple_hash('maya', table_size)}")
print(f"Hash of 'coffee': {simple_hash('coffee', table_size)}")
print(f"Hash of 42: {simple_hash(42, table_size)}")
# Better hash function using Python's built-in
def better_hash(key, table_size):
"""Using Python's built-in hash function"""
return hash(key) % table_size
# Hash distribution test
def test_hash_distribution(hash_func, keys, table_size):
"""Test how evenly a hash function distributes keys"""
distribution = [0] * table_size
for key in keys:
bucket = hash_func(key, table_size)
distribution[bucket] += 1
print(f"Hash distribution: {distribution}")
print(f"Min bucket: {min(distribution)}, Max bucket: {max(distribution)}")
# Calculate standard deviation for evenness
mean = len(keys) / table_size
variance = sum((count - mean) ** 2 for count in distribution) / table_size
std_dev = variance ** 0.5
print(f"Standard deviation: {std_dev:.2f}")
# Test with different types of keys
test_keys = [
'maya', 'coffee', 'seattle', 'python', 'hash', 'table',
'amazon', 'microsoft', 'coding', 'debug', 'algorithm', 'data'
]
print("Simple hash function:")
test_hash_distribution(simple_hash, test_keys, 7)
print("\nBetter hash function:")
test_hash_distribution(better_hash, test_keys, 7)
Building a Hash Table from Scratch
class HashTable:
"""A hash table implementation with separate chaining for collision resolution"""
def __init__(self, initial_capacity=8):
self.capacity = initial_capacity
self.size = 0
self.buckets = [[] for _ in range(self.capacity)]
self.load_factor_threshold = 0.75
def _hash(self, key):
"""Hash function using Python's built-in hash"""
return hash(key) % self.capacity
def _resize(self):
"""Resize the hash table when load factor exceeds threshold"""
print(f"Resizing from {self.capacity} to {self.capacity * 2}")
old_buckets = self.buckets
self.capacity *= 2
self.size = 0
self.buckets = [[] for _ in range(self.capacity)]
# Rehash all existing key-value pairs
for bucket in old_buckets:
for key, value in bucket:
self.put(key, value)
def put(self, key, value):
"""Insert or update a key-value pair"""
bucket_index = self._hash(key)
bucket = self.buckets[bucket_index]
# Check if key already exists
for i, (k, v) in enumerate(bucket):
if k == key:
bucket[i] = (key, value) # Update existing
return
# Add new key-value pair
bucket.append((key, value))
self.size += 1
# Check if resize is needed
if self.size > self.capacity * self.load_factor_threshold:
self._resize()
def get(self, key):
"""Retrieve value for a given key"""
bucket_index = self._hash(key)
bucket = self.buckets[bucket_index]
for k, v in bucket:
if k == key:
return v
raise KeyError(f"Key '{key}' not found")
def delete(self, key):
"""Remove a key-value pair"""
bucket_index = self._hash(key)
bucket = self.buckets[bucket_index]
for i, (k, v) in enumerate(bucket):
if k == key:
del bucket[i]
self.size -= 1
return v
raise KeyError(f"Key '{key}' not found")
def contains(self, key):
"""Check if key exists in hash table"""
try:
self.get(key)
return True
except KeyError:
return False
def keys(self):
"""Return all keys in the hash table"""
result = []
for bucket in self.buckets:
for key, _ in bucket:
result.append(key)
return result
def values(self):
"""Return all values in the hash table"""
result = []
for bucket in self.buckets:
for _, value in bucket:
result.append(value)
return result
def items(self):
"""Return all key-value pairs"""
result = []
for bucket in self.buckets:
for key, value in bucket:
result.append((key, value))
return result
def load_factor(self):
"""Calculate current load factor"""
return self.size / self.capacity
def stats(self):
"""Get statistics about the hash table"""
bucket_sizes = [len(bucket) for bucket in self.buckets]
return {
'capacity': self.capacity,
'size': self.size,
'load_factor': self.load_factor(),
'empty_buckets': bucket_sizes.count(0),
'max_bucket_size': max(bucket_sizes) if bucket_sizes else 0,
'bucket_size_distribution': bucket_sizes
}
def __str__(self):
"""String representation for debugging"""
result = []
for i, bucket in enumerate(self.buckets):
if bucket:
items = [f"{k}: {v}" for k, v in bucket]
result.append(f"Bucket {i}: [{', '.join(items)}]")
return '\n'.join(result)
# Test the hash table implementation
ht = HashTable(4) # Start small to see resizing
print("Testing hash table implementation:")
print(f"Initial stats: {ht.stats()}")
# Add some key-value pairs
coffee_orders = [
('maya', 'flat_white'),
('alice', 'cappuccino'),
('bob', 'americano'),
('charlie', 'latte'),
('diana', 'espresso'),
('eve', 'macchiato')
]
for name, order in coffee_orders:
ht.put(name, order)
print(f"Added {name}: {order}")
print(f"Load factor: {ht.load_factor():.2f}")
print(f"\nFinal hash table:")
print(ht)
print(f"\nFinal stats: {ht.stats()}")
# Test retrieval
print(f"\nMaya's order: {ht.get('maya')}")
print(f"Bob's order: {ht.get('bob')}")
# Test all operations
print(f"\nAll customers: {ht.keys()}")
print(f"All orders: {ht.values()}")
Collision Resolution Strategies
Separate Chaining vs Open Addressing
class OpenAddressingHashTable:
"""Hash table using linear probing for collision resolution"""
def __init__(self, initial_capacity=8):
self.capacity = initial_capacity
self.size = 0
self.keys = [None] * self.capacity
self.values = [None] * self.capacity
self.deleted = [False] * self.capacity # Track deleted slots
self.load_factor_threshold = 0.5 # Lower for open addressing
def _hash(self, key):
return hash(key) % self.capacity
def _probe(self, key):
"""Find the appropriate slot for a key using linear probing"""
index = self._hash(key)
original_index = index
while True:
if self.keys[index] is None or self.deleted[index]:
# Empty slot or deleted slot - can insert here
return index
elif self.keys[index] == key:
# Key already exists
return index
# Linear probing: move to next slot
index = (index + 1) % self.capacity
# Prevent infinite loop
if index == original_index:
raise Exception("Hash table is full")
def _resize(self):
"""Resize and rehash all elements"""
old_keys = self.keys
old_values = self.values
old_deleted = self.deleted
self.capacity *= 2
self.size = 0
self.keys = [None] * self.capacity
self.values = [None] * self.capacity
self.deleted = [False] * self.capacity
# Rehash all non-deleted elements
for i in range(len(old_keys)):
if old_keys[i] is not None and not old_deleted[i]:
self.put(old_keys[i], old_values[i])
def put(self, key, value):
if self.size >= self.capacity * self.load_factor_threshold:
self._resize()
index = self._probe(key)
if self.keys[index] is None or self.deleted[index]:
# New key
self.size += 1
self.keys[index] = key
self.values[index] = value
self.deleted[index] = False
def get(self, key):
index = self._hash(key)
original_index = index
while self.keys[index] is not None:
if self.keys[index] == key and not self.deleted[index]:
return self.values[index]
index = (index + 1) % self.capacity
if index == original_index:
break
raise KeyError(f"Key '{key}' not found")
def delete(self, key):
index = self._hash(key)
original_index = index
while self.keys[index] is not None:
if self.keys[index] == key and not self.deleted[index]:
self.deleted[index] = True
self.size -= 1
return self.values[index]
index = (index + 1) % self.capacity
if index == original_index:
break
raise KeyError(f"Key '{key}' not found")
def stats(self):
used_slots = sum(1 for i in range(self.capacity)
if self.keys[i] is not None and not self.deleted[i])
deleted_slots = sum(1 for i in range(self.capacity) if self.deleted[i])
return {
'capacity': self.capacity,
'size': self.size,
'used_slots': used_slots,
'deleted_slots': deleted_slots,
'load_factor': self.size / self.capacity
}
# Performance comparison between collision resolution methods
import time
def performance_test(hash_table_class, name, operations=10000):
print(f"\nTesting {name}:")
ht = hash_table_class()
# Insert test
start_time = time.time()
for i in range(operations):
ht.put(f"key_{i}", f"value_{i}")
insert_time = time.time() - start_time
# Lookup test
start_time = time.time()
for i in range(operations):
ht.get(f"key_{i}")
lookup_time = time.time() - start_time
print(f"Insert time: {insert_time:.4f}s")
print(f"Lookup time: {lookup_time:.4f}s")
print(f"Stats: {ht.stats()}")
# Compare performance
performance_test(HashTable, "Separate Chaining", 5000)
performance_test(OpenAddressingHashTable, "Linear Probing", 5000)
Real-World Applications
Caching with LRU Implementation
from collections import OrderedDict
import time
class LRUCache:
"""Least Recently Used cache using hash table + doubly linked list"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict() # Maintains insertion/access order
self.access_count = 0
self.hit_count = 0
def get(self, key):
self.access_count += 1
if key in self.cache:
# Move to end (most recently used)
value = self.cache.pop(key)
self.cache[key] = value
self.hit_count += 1
return value
return None # Cache miss
def put(self, key, value):
if key in self.cache:
# Update existing key
self.cache.pop(key)
elif len(self.cache) >= self.capacity:
# Remove least recently used (first item)
self.cache.popitem(last=False)
self.cache[key] = value
def hit_rate(self):
if self.access_count == 0:
return 0
return self.hit_count / self.access_count
def stats(self):
return {
'size': len(self.cache),
'capacity': self.capacity,
'access_count': self.access_count,
'hit_count': self.hit_count,
'hit_rate': f"{self.hit_rate():.2%}"
}
# Database query cache example
class DatabaseQueryCache:
def __init__(self, cache_size=100):
self.cache = LRUCache(cache_size)
self.query_times = {}
def execute_query(self, sql_query):
"""Simulate database query with caching"""
cache_key = hash(sql_query)
# Check cache first
cached_result = self.cache.get(cache_key)
if cached_result is not None:
print(f"Cache HIT for query: {sql_query[:50]}...")
return cached_result
# Cache miss - execute actual query
print(f"Cache MISS for query: {sql_query[:50]}...")
start_time = time.time()
# Simulate database query execution
time.sleep(0.1) # Simulate 100ms query time
result = f"Result for: {sql_query}"
query_time = time.time() - start_time
self.query_times[cache_key] = query_time
# Store in cache
self.cache.put(cache_key, result)
return result
# Test the query cache
db_cache = DatabaseQueryCache(cache_size=5)
queries = [
"SELECT * FROM users WHERE city = 'Seattle'",
"SELECT * FROM orders WHERE date > '2024-01-01'",
"SELECT COUNT(*) FROM products WHERE category = 'coffee'",
"SELECT * FROM users WHERE city = 'Seattle'", # Duplicate
"SELECT * FROM orders WHERE status = 'pending'",
"SELECT * FROM users WHERE city = 'Seattle'", # Duplicate again
]
print("Testing database query cache:")
for query in queries:
result = db_cache.execute_query(query)
print(f"\nCache statistics: {db_cache.cache.stats()}")
Hash-Based Data Deduplication
import hashlib
import json
class DataDeduplicator:
"""Remove duplicate data using content-based hashing"""
def __init__(self):
self.content_hashes = set()
self.hash_to_content = {}
self.duplicate_count = 0
def _compute_hash(self, data):
"""Compute SHA-256 hash of data"""
if isinstance(data, dict):
# Sort keys for consistent hashing
content = json.dumps(data, sort_keys=True)
elif isinstance(data, (list, tuple)):
content = json.dumps(sorted(data))
else:
content = str(data)
return hashlib.sha256(content.encode()).hexdigest()
def add_data(self, data, identifier=None):
"""Add data, returning True if it's new, False if duplicate"""
content_hash = self._compute_hash(data)
if content_hash in self.content_hashes:
self.duplicate_count += 1
return False # Duplicate
self.content_hashes.add(content_hash)
self.hash_to_content[content_hash] = {
'data': data,
'identifier': identifier,
'first_seen': time.time()
}
return True # New data
def get_unique_data(self):
"""Return all unique data items"""
return [item['data'] for item in self.hash_to_content.values()]
def stats(self):
return {
'unique_items': len(self.content_hashes),
'duplicates_found': self.duplicate_count,
'deduplication_ratio': f"{self.duplicate_count / (len(self.content_hashes) + self.duplicate_count):.2%}" if (len(self.content_hashes) + self.duplicate_count) > 0 else "0%"
}
# Example: Email deduplication system
class EmailDeduplicator:
def __init__(self):
self.deduplicator = DataDeduplicator()
self.emails_processed = 0
def process_email(self, email_data):
"""Process an email, detecting duplicates"""
self.emails_processed += 1
# Create a normalized version for deduplication
normalized_email = {
'from': email_data.get('from', '').lower().strip(),
'to': [addr.lower().strip() for addr in email_data.get('to', [])],
'subject': email_data.get('subject', '').strip(),
'body': email_data.get('body', '').strip(),
# Exclude timestamp and message_id from deduplication
}
is_unique = self.deduplicator.add_data(
normalized_email,
identifier=email_data.get('message_id')
)
if is_unique:
print(f"✅ New email: {email_data.get('subject', 'No subject')}")
return True
else:
print(f"🔄 Duplicate email: {email_data.get('subject', 'No subject')}")
return False
# Test email deduplication
email_dedup = EmailDeduplicator()
emails = [
{
'message_id': '1',
'from': 'maya@coffee.com',
'to': ['team@company.com'],
'subject': 'Weekly Coffee Meeting',
'body': 'Let\'s discuss our coffee strategy.',
'timestamp': '2024-01-01 10:00:00'
},
{
'message_id': '2',
'from': 'Maya@Coffee.com', # Different case
'to': ['team@company.com'],
'subject': 'Weekly Coffee Meeting',
'body': 'Let\'s discuss our coffee strategy.',
'timestamp': '2024-01-01 10:05:00' # Different timestamp
},
{
'message_id': '3',
'from': 'alice@company.com',
'to': ['maya@coffee.com'],
'subject': 'Re: Coffee Meeting',
'body': 'Sounds great!',
'timestamp': '2024-01-01 11:00:00'
},
{
'message_id': '4',
'from': 'maya@coffee.com',
'to': ['team@company.com'],
'subject': 'Weekly Coffee Meeting',
'body': 'Let\'s discuss our coffee strategy.',
'timestamp': '2024-01-02 10:00:00' # Same content, different day
}
]
print("Processing emails for deduplication:")
for email in emails:
email_dedup.process_email(email)
print(f"\nDeduplication stats: {email_dedup.deduplicator.stats()}")
Advanced Hash Table Optimizations
Robin Hood Hashing
class RobinHoodHashTable:
"""Hash table using Robin Hood hashing for better performance"""
def __init__(self, initial_capacity=8):
self.capacity = initial_capacity
self.size = 0
self.keys = [None] * self.capacity
self.values = [None] * self.capacity
self.distances = [0] * self.capacity # Distance from ideal position
def _hash(self, key):
return hash(key) % self.capacity
def _probe_distance(self, index, key):
"""Calculate how far a key is from its ideal position"""
ideal_index = self._hash(key)
if index >= ideal_index:
return index - ideal_index
else:
return self.capacity - ideal_index + index
def put(self, key, value):
if self.size >= self.capacity * 0.9: # Resize at 90% capacity
self._resize()
index = self._hash(key)
distance = 0
while True:
if self.keys[index] is None:
# Empty slot - insert here
self.keys[index] = key
self.values[index] = value
self.distances[index] = distance
self.size += 1
return
if self.keys[index] == key:
# Update existing key
self.values[index] = value
return
# Robin Hood: if current entry has traveled less distance,
# swap with it and continue inserting the displaced entry
if distance > self.distances[index]:
# Swap entries
self.keys[index], key = key, self.keys[index]
self.values[index], value = value, self.values[index]
self.distances[index], distance = distance, self.distances[index]
index = (index + 1) % self.capacity
distance += 1
def get(self, key):
index = self._hash(key)
distance = 0
while self.keys[index] is not None:
if self.keys[index] == key:
return self.values[index]
# If we've traveled further than the stored distance,
# the key doesn't exist
if distance > self.distances[index]:
break
index = (index + 1) % self.capacity
distance += 1
raise KeyError(f"Key '{key}' not found")
def _resize(self):
old_keys = self.keys
old_values = self.values
self.capacity *= 2
self.size = 0
self.keys = [None] * self.capacity
self.values = [None] * self.capacity
self.distances = [0] * self.capacity
for i in range(len(old_keys)):
if old_keys[i] is not None:
self.put(old_keys[i], old_values[i])
def probe_stats(self):
"""Analyze probe distances for performance insights"""
used_slots = [i for i in range(self.capacity) if self.keys[i] is not None]
distances = [self.distances[i] for i in used_slots]
if not distances:
return {'avg_distance': 0, 'max_distance': 0}
return {
'avg_distance': sum(distances) / len(distances),
'max_distance': max(distances),
'distance_distribution': distances
}
Consistent Hashing for Distributed Systems
import bisect
import hashlib
class ConsistentHashRing:
"""Consistent hashing implementation for distributed systems"""
def __init__(self, nodes=None, replicas=3):
self.replicas = replicas
self.ring = {} # hash_value -> node
self.sorted_hashes = []
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key):
"""Generate hash value for a key"""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node):
"""Add a node to the hash ring"""
for i in range(self.replicas):
replica_key = f"{node}:{i}"
hash_value = self._hash(replica_key)
self.ring[hash_value] = node
bisect.insort(self.sorted_hashes, hash_value)
def remove_node(self, node):
"""Remove a node from the hash ring"""
for i in range(self.replicas):
replica_key = f"{node}:{i}"
hash_value = self._hash(replica_key)
if hash_value in self.ring:
del self.ring[hash_value]
self.sorted_hashes.remove(hash_value)
def get_node(self, key):
"""Find which node should handle this key"""
if not self.ring:
return None
hash_value = self._hash(key)
# Find the first node hash >= our key hash
index = bisect.bisect_right(self.sorted_hashes, hash_value)
# If we're past the end, wrap around to the beginning
if index == len(self.sorted_hashes):
index = 0
return self.ring[self.sorted_hashes[index]]
def get_nodes(self, key, count=1):
"""Get multiple nodes for replication"""
if not self.ring or count <= 0:
return []
hash_value = self._hash(key)
index = bisect.bisect_right(self.sorted_hashes, hash_value)
nodes = []
seen_nodes = set()
for _ in range(count):
if index >= len(self.sorted_hashes):
index = 0
node = self.ring[self.sorted_hashes[index]]
if node not in seen_nodes:
nodes.append(node)
seen_nodes.add(node)
index += 1
# Prevent infinite loop if we don't have enough unique nodes
if len(seen_nodes) == len(set(self.ring.values())):
break
return nodes
# Example: Distributed cache system
class DistributedCache:
def __init__(self, cache_nodes):
self.hash_ring = ConsistentHashRing(cache_nodes)
self.node_caches = {node: {} for node in cache_nodes}
self.stats = {node: {'hits': 0, 'misses': 0} for node in cache_nodes}
def put(self, key, value):
"""Store a key-value pair in the appropriate cache node"""
node = self.hash_ring.get_node(key)
if node:
self.node_caches[node][key] = value
print(f"Stored '{key}' on node {node}")
def get(self, key):
"""Retrieve a value from the appropriate cache node"""
node = self.hash_ring.get_node(key)
if node and key in self.node_caches[node]:
self.stats[node]['hits'] += 1
print(f"Cache HIT for '{key}' on node {node}")
return self.node_caches[node][key]
if node:
self.stats[node]['misses'] += 1
print(f"Cache MISS for '{key}' on node {node}")
return None
def add_cache_node(self, node):
"""Add a new cache node and rebalance"""
print(f"Adding new cache node: {node}")
# Store current data
all_data = {}
for node_cache in self.node_caches.values():
all_data.update(node_cache)
# Add new node
self.hash_ring.add_node(node)
self.node_caches[node] = {}
self.stats[node] = {'hits': 0, 'misses': 0}
# Clear all caches and redistribute
for cache in self.node_caches.values():
cache.clear()
# Redistribute all data
for key, value in all_data.items():
self.put(key, value)
def get_cache_stats(self):
"""Get statistics for all cache nodes"""
return {
node: {
'items': len(cache),
'hits': self.stats[node]['hits'],
'misses': self.stats[node]['misses'],
'hit_rate': f"{self.stats[node]['hits'] / max(1, self.stats[node]['hits'] + self.stats[node]['misses']):.2%}"
}
for node, cache in self.node_caches.items()
}
# Test distributed cache
cache_nodes = ['server1', 'server2', 'server3']
dist_cache = DistributedCache(cache_nodes)
# Add some data
cache_data = [
('user:1', {'name': 'Maya', 'city': 'Seattle'}),
('user:2', {'name': 'Alice', 'city': 'Portland'}),
('order:100', {'total': 45.50, 'items': 3}),
('product:coffee', {'price': 4.50, 'stock': 100}),
('session:abc123', {'user_id': 1, 'expires': '2024-01-01'})
]
print("Storing data in distributed cache:")
for key, value in cache_data:
dist_cache.put(key, value)
print("\nRetrieving data:")
for key, _ in cache_data:
result = dist_cache.get(key)
print(f"\nCache statistics before adding node:")
for node, stats in dist_cache.get_cache_stats().items():
print(f"{node}: {stats}")
# Add a new cache node
dist_cache.add_cache_node('server4')
print(f"\nCache statistics after adding node:")
for node, stats in dist_cache.get_cache_stats().items():
print(f"{node}: {stats}")
Final Thoughts: Hash Tables as the Foundation of Modern Computing
That 30-second-to-200-millisecond optimization at Amazon opened my eyes to the incredible power of choosing the right data structure. Hash tables aren't just an academic concept - they're the invisible foundation powering everything from database indexes to web caches to distributed systems.
Every time you use a Python dictionary, JavaScript object, or Java HashMap, you're leveraging decades of hash table optimization. Understanding how they work under the hood makes you a better developer, whether you're optimizing database queries, building caching systems, or designing distributed architectures.
The beauty of hash tables lies in their simplicity: transform any key into an array index, and you get O(1) lookups. The complexity comes in handling collisions gracefully, maintaining good performance as data grows, and adapting to different use cases.
Whether you're building a simple user preference system or a distributed cache spanning multiple continents, hash tables provide the performance characteristics that make modern applications possible. Master them, and you'll never look at performance problems the same way again.
Currently writing this from Analog Coffee in Capitol Hill, where I'm optimizing a hash-based deduplication system while enjoying my usual cortado. Share your hash table optimization stories @maya_codes_pnw - we've all had those performance breakthrough moments! ⚡☕
Add Comment
No comments yet. Be the first to comment!