Navigation

Programming

Hash Tables Hash Maps Complete Guide 2025

Master hash tables from a developer who optimized lookup performance at Amazon, covering implementation, collision handling, and real-world applications in modern programming.

Hash Tables: The Secret Weapon Behind Lightning-Fast Lookups

It was my second month at Amazon, and I was tasked with optimizing a user preference system that was taking 30 seconds to load. The existing solution? A massive array that we searched linearly for each user preference. With 50 million users and dozens of preferences each, you can imagine the performance nightmare.

My senior engineer pulled me aside and said, "Maya, meet your new best friend: the hash table. It'll turn your O(n) searches into O(1) lookups." That single optimization reduced our load time from 30 seconds to 200 milliseconds. Since then, hash tables have been my go-to solution for any performance-critical lookup scenario.

Table Of Contents

The Linear Search Horror Story

Here's what our original code looked like:

# The nightmare: O(n) lookup for everything
class UserPreferencesSlowVersion:
    def __init__(self):
        self.preferences = []  # List of tuples: (user_id, preference_key, value)
    
    def set_preference(self, user_id, key, value):
        # Check if preference already exists
        for i, (uid, pkey, pvalue) in enumerate(self.preferences):
            if uid == user_id and pkey == key:
                self.preferences[i] = (user_id, key, value)
                return
        
        # Add new preference
        self.preferences.append((user_id, key, value))
    
    def get_preference(self, user_id, key):
        # Linear search through all preferences - O(n) time!
        for uid, pkey, pvalue in self.preferences:
            if uid == user_id and pkey == key:
                return pvalue
        return None
    
    def get_user_preferences(self, user_id):
        # Even worse - O(n) for each preference!
        user_prefs = {}
        for uid, pkey, pvalue in self.preferences:
            if uid == user_id:
                user_prefs[pkey] = pvalue
        return user_prefs

# Performance test with realistic data
import time
import random

slow_system = UserPreferencesSlowVersion()

# Simulate 100,000 users with 10 preferences each
print("Loading preferences...")
start_time = time.time()

for user_id in range(100000):
    for pref_num in range(10):
        slow_system.set_preference(
            user_id, 
            f"preference_{pref_num}", 
            random.choice(['enabled', 'disabled'])
        )

load_time = time.time() - start_time
print(f"Load time: {load_time:.2f} seconds")

# Test lookup performance
print("Testing lookup performance...")
start_time = time.time()

for _ in range(1000):
    random_user = random.randint(0, 99999)
    slow_system.get_preference(random_user, "preference_5")

lookup_time = time.time() - start_time
print(f"1000 lookups took: {lookup_time:.2f} seconds")

This code was a performance disaster. Every lookup required scanning through potentially millions of records. Enter hash tables.

Hash Tables: The O(1) Miracle

Here's how I rewrote the system using hash tables:

# The hash table solution: O(1) average case lookups
class UserPreferencesFastVersion:
    def __init__(self):
        # Python dict IS a hash table!
        self.preferences = {}  # user_id -> {preference_key: value}
    
    def set_preference(self, user_id, key, value):
        if user_id not in self.preferences:
            self.preferences[user_id] = {}
        
        self.preferences[user_id][key] = value
    
    def get_preference(self, user_id, key):
        user_prefs = self.preferences.get(user_id, {})
        return user_prefs.get(key)
    
    def get_user_preferences(self, user_id):
        return self.preferences.get(user_id, {}).copy()

# Performance comparison
fast_system = UserPreferencesFastVersion()

print("Loading preferences with hash table...")
start_time = time.time()

for user_id in range(100000):
    for pref_num in range(10):
        fast_system.set_preference(
            user_id, 
            f"preference_{pref_num}", 
            random.choice(['enabled', 'disabled'])
        )

load_time = time.time() - start_time
print(f"Load time: {load_time:.2f} seconds")

print("Testing lookup performance...")
start_time = time.time()

for _ in range(1000):
    random_user = random.randint(0, 99999)
    fast_system.get_preference(random_user, "preference_5")

lookup_time = time.time() - start_time
print(f"1000 lookups took: {lookup_time:.2f} seconds")

The results were dramatic:

  • Linear search: 1000 lookups in ~15 seconds
  • Hash table: 1000 lookups in ~0.001 seconds

That's a 15,000x performance improvement!

How Hash Tables Actually Work

The Magic of Hash Functions

# Understanding hash functions
def simple_hash(key, table_size):
    """A simple hash function for educational purposes"""
    if isinstance(key, str):
        # Sum ASCII values of characters
        hash_value = sum(ord(char) for char in key)
    elif isinstance(key, int):
        hash_value = key
    else:
        hash_value = hash(key)
    
    # Map to table size using modulo
    return hash_value % table_size

# Example usage
table_size = 10
print(f"Hash of 'maya': {simple_hash('maya', table_size)}")
print(f"Hash of 'coffee': {simple_hash('coffee', table_size)}")
print(f"Hash of 42: {simple_hash(42, table_size)}")

# Better hash function using Python's built-in
def better_hash(key, table_size):
    """Using Python's built-in hash function"""
    return hash(key) % table_size

# Hash distribution test
def test_hash_distribution(hash_func, keys, table_size):
    """Test how evenly a hash function distributes keys"""
    distribution = [0] * table_size
    
    for key in keys:
        bucket = hash_func(key, table_size)
        distribution[bucket] += 1
    
    print(f"Hash distribution: {distribution}")
    print(f"Min bucket: {min(distribution)}, Max bucket: {max(distribution)}")
    
    # Calculate standard deviation for evenness
    mean = len(keys) / table_size
    variance = sum((count - mean) ** 2 for count in distribution) / table_size
    std_dev = variance ** 0.5
    print(f"Standard deviation: {std_dev:.2f}")

# Test with different types of keys
test_keys = [
    'maya', 'coffee', 'seattle', 'python', 'hash', 'table',
    'amazon', 'microsoft', 'coding', 'debug', 'algorithm', 'data'
]

print("Simple hash function:")
test_hash_distribution(simple_hash, test_keys, 7)

print("\nBetter hash function:")
test_hash_distribution(better_hash, test_keys, 7)

Building a Hash Table from Scratch

class HashTable:
    """A hash table implementation with separate chaining for collision resolution"""
    
    def __init__(self, initial_capacity=8):
        self.capacity = initial_capacity
        self.size = 0
        self.buckets = [[] for _ in range(self.capacity)]
        self.load_factor_threshold = 0.75
    
    def _hash(self, key):
        """Hash function using Python's built-in hash"""
        return hash(key) % self.capacity
    
    def _resize(self):
        """Resize the hash table when load factor exceeds threshold"""
        print(f"Resizing from {self.capacity} to {self.capacity * 2}")
        
        old_buckets = self.buckets
        self.capacity *= 2
        self.size = 0
        self.buckets = [[] for _ in range(self.capacity)]
        
        # Rehash all existing key-value pairs
        for bucket in old_buckets:
            for key, value in bucket:
                self.put(key, value)
    
    def put(self, key, value):
        """Insert or update a key-value pair"""
        bucket_index = self._hash(key)
        bucket = self.buckets[bucket_index]
        
        # Check if key already exists
        for i, (k, v) in enumerate(bucket):
            if k == key:
                bucket[i] = (key, value)  # Update existing
                return
        
        # Add new key-value pair
        bucket.append((key, value))
        self.size += 1
        
        # Check if resize is needed
        if self.size > self.capacity * self.load_factor_threshold:
            self._resize()
    
    def get(self, key):
        """Retrieve value for a given key"""
        bucket_index = self._hash(key)
        bucket = self.buckets[bucket_index]
        
        for k, v in bucket:
            if k == key:
                return v
        
        raise KeyError(f"Key '{key}' not found")
    
    def delete(self, key):
        """Remove a key-value pair"""
        bucket_index = self._hash(key)
        bucket = self.buckets[bucket_index]
        
        for i, (k, v) in enumerate(bucket):
            if k == key:
                del bucket[i]
                self.size -= 1
                return v
        
        raise KeyError(f"Key '{key}' not found")
    
    def contains(self, key):
        """Check if key exists in hash table"""
        try:
            self.get(key)
            return True
        except KeyError:
            return False
    
    def keys(self):
        """Return all keys in the hash table"""
        result = []
        for bucket in self.buckets:
            for key, _ in bucket:
                result.append(key)
        return result
    
    def values(self):
        """Return all values in the hash table"""
        result = []
        for bucket in self.buckets:
            for _, value in bucket:
                result.append(value)
        return result
    
    def items(self):
        """Return all key-value pairs"""
        result = []
        for bucket in self.buckets:
            for key, value in bucket:
                result.append((key, value))
        return result
    
    def load_factor(self):
        """Calculate current load factor"""
        return self.size / self.capacity
    
    def stats(self):
        """Get statistics about the hash table"""
        bucket_sizes = [len(bucket) for bucket in self.buckets]
        
        return {
            'capacity': self.capacity,
            'size': self.size,
            'load_factor': self.load_factor(),
            'empty_buckets': bucket_sizes.count(0),
            'max_bucket_size': max(bucket_sizes) if bucket_sizes else 0,
            'bucket_size_distribution': bucket_sizes
        }
    
    def __str__(self):
        """String representation for debugging"""
        result = []
        for i, bucket in enumerate(self.buckets):
            if bucket:
                items = [f"{k}: {v}" for k, v in bucket]
                result.append(f"Bucket {i}: [{', '.join(items)}]")
        return '\n'.join(result)

# Test the hash table implementation
ht = HashTable(4)  # Start small to see resizing

print("Testing hash table implementation:")
print(f"Initial stats: {ht.stats()}")

# Add some key-value pairs
coffee_orders = [
    ('maya', 'flat_white'),
    ('alice', 'cappuccino'),
    ('bob', 'americano'),
    ('charlie', 'latte'),
    ('diana', 'espresso'),
    ('eve', 'macchiato')
]

for name, order in coffee_orders:
    ht.put(name, order)
    print(f"Added {name}: {order}")
    print(f"Load factor: {ht.load_factor():.2f}")

print(f"\nFinal hash table:")
print(ht)
print(f"\nFinal stats: {ht.stats()}")

# Test retrieval
print(f"\nMaya's order: {ht.get('maya')}")
print(f"Bob's order: {ht.get('bob')}")

# Test all operations
print(f"\nAll customers: {ht.keys()}")
print(f"All orders: {ht.values()}")

Collision Resolution Strategies

Separate Chaining vs Open Addressing

class OpenAddressingHashTable:
    """Hash table using linear probing for collision resolution"""
    
    def __init__(self, initial_capacity=8):
        self.capacity = initial_capacity
        self.size = 0
        self.keys = [None] * self.capacity
        self.values = [None] * self.capacity
        self.deleted = [False] * self.capacity  # Track deleted slots
        self.load_factor_threshold = 0.5  # Lower for open addressing
    
    def _hash(self, key):
        return hash(key) % self.capacity
    
    def _probe(self, key):
        """Find the appropriate slot for a key using linear probing"""
        index = self._hash(key)
        original_index = index
        
        while True:
            if self.keys[index] is None or self.deleted[index]:
                # Empty slot or deleted slot - can insert here
                return index
            elif self.keys[index] == key:
                # Key already exists
                return index
            
            # Linear probing: move to next slot
            index = (index + 1) % self.capacity
            
            # Prevent infinite loop
            if index == original_index:
                raise Exception("Hash table is full")
    
    def _resize(self):
        """Resize and rehash all elements"""
        old_keys = self.keys
        old_values = self.values
        old_deleted = self.deleted
        
        self.capacity *= 2
        self.size = 0
        self.keys = [None] * self.capacity
        self.values = [None] * self.capacity
        self.deleted = [False] * self.capacity
        
        # Rehash all non-deleted elements
        for i in range(len(old_keys)):
            if old_keys[i] is not None and not old_deleted[i]:
                self.put(old_keys[i], old_values[i])
    
    def put(self, key, value):
        if self.size >= self.capacity * self.load_factor_threshold:
            self._resize()
        
        index = self._probe(key)
        
        if self.keys[index] is None or self.deleted[index]:
            # New key
            self.size += 1
        
        self.keys[index] = key
        self.values[index] = value
        self.deleted[index] = False
    
    def get(self, key):
        index = self._hash(key)
        original_index = index
        
        while self.keys[index] is not None:
            if self.keys[index] == key and not self.deleted[index]:
                return self.values[index]
            
            index = (index + 1) % self.capacity
            
            if index == original_index:
                break
        
        raise KeyError(f"Key '{key}' not found")
    
    def delete(self, key):
        index = self._hash(key)
        original_index = index
        
        while self.keys[index] is not None:
            if self.keys[index] == key and not self.deleted[index]:
                self.deleted[index] = True
                self.size -= 1
                return self.values[index]
            
            index = (index + 1) % self.capacity
            
            if index == original_index:
                break
        
        raise KeyError(f"Key '{key}' not found")
    
    def stats(self):
        used_slots = sum(1 for i in range(self.capacity) 
                        if self.keys[i] is not None and not self.deleted[i])
        deleted_slots = sum(1 for i in range(self.capacity) if self.deleted[i])
        
        return {
            'capacity': self.capacity,
            'size': self.size,
            'used_slots': used_slots,
            'deleted_slots': deleted_slots,
            'load_factor': self.size / self.capacity
        }

# Performance comparison between collision resolution methods
import time

def performance_test(hash_table_class, name, operations=10000):
    print(f"\nTesting {name}:")
    ht = hash_table_class()
    
    # Insert test
    start_time = time.time()
    for i in range(operations):
        ht.put(f"key_{i}", f"value_{i}")
    insert_time = time.time() - start_time
    
    # Lookup test
    start_time = time.time()
    for i in range(operations):
        ht.get(f"key_{i}")
    lookup_time = time.time() - start_time
    
    print(f"Insert time: {insert_time:.4f}s")
    print(f"Lookup time: {lookup_time:.4f}s")
    print(f"Stats: {ht.stats()}")

# Compare performance
performance_test(HashTable, "Separate Chaining", 5000)
performance_test(OpenAddressingHashTable, "Linear Probing", 5000)

Real-World Applications

Caching with LRU Implementation

from collections import OrderedDict
import time

class LRUCache:
    """Least Recently Used cache using hash table + doubly linked list"""
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()  # Maintains insertion/access order
        self.access_count = 0
        self.hit_count = 0
    
    def get(self, key):
        self.access_count += 1
        
        if key in self.cache:
            # Move to end (most recently used)
            value = self.cache.pop(key)
            self.cache[key] = value
            self.hit_count += 1
            return value
        
        return None  # Cache miss
    
    def put(self, key, value):
        if key in self.cache:
            # Update existing key
            self.cache.pop(key)
        elif len(self.cache) >= self.capacity:
            # Remove least recently used (first item)
            self.cache.popitem(last=False)
        
        self.cache[key] = value
    
    def hit_rate(self):
        if self.access_count == 0:
            return 0
        return self.hit_count / self.access_count
    
    def stats(self):
        return {
            'size': len(self.cache),
            'capacity': self.capacity,
            'access_count': self.access_count,
            'hit_count': self.hit_count,
            'hit_rate': f"{self.hit_rate():.2%}"
        }

# Database query cache example
class DatabaseQueryCache:
    def __init__(self, cache_size=100):
        self.cache = LRUCache(cache_size)
        self.query_times = {}
    
    def execute_query(self, sql_query):
        """Simulate database query with caching"""
        cache_key = hash(sql_query)
        
        # Check cache first
        cached_result = self.cache.get(cache_key)
        if cached_result is not None:
            print(f"Cache HIT for query: {sql_query[:50]}...")
            return cached_result
        
        # Cache miss - execute actual query
        print(f"Cache MISS for query: {sql_query[:50]}...")
        start_time = time.time()
        
        # Simulate database query execution
        time.sleep(0.1)  # Simulate 100ms query time
        result = f"Result for: {sql_query}"
        
        query_time = time.time() - start_time
        self.query_times[cache_key] = query_time
        
        # Store in cache
        self.cache.put(cache_key, result)
        
        return result

# Test the query cache
db_cache = DatabaseQueryCache(cache_size=5)

queries = [
    "SELECT * FROM users WHERE city = 'Seattle'",
    "SELECT * FROM orders WHERE date > '2024-01-01'",
    "SELECT COUNT(*) FROM products WHERE category = 'coffee'",
    "SELECT * FROM users WHERE city = 'Seattle'",  # Duplicate
    "SELECT * FROM orders WHERE status = 'pending'",
    "SELECT * FROM users WHERE city = 'Seattle'",  # Duplicate again
]

print("Testing database query cache:")
for query in queries:
    result = db_cache.execute_query(query)

print(f"\nCache statistics: {db_cache.cache.stats()}")

Hash-Based Data Deduplication

import hashlib
import json

class DataDeduplicator:
    """Remove duplicate data using content-based hashing"""
    
    def __init__(self):
        self.content_hashes = set()
        self.hash_to_content = {}
        self.duplicate_count = 0
    
    def _compute_hash(self, data):
        """Compute SHA-256 hash of data"""
        if isinstance(data, dict):
            # Sort keys for consistent hashing
            content = json.dumps(data, sort_keys=True)
        elif isinstance(data, (list, tuple)):
            content = json.dumps(sorted(data))
        else:
            content = str(data)
        
        return hashlib.sha256(content.encode()).hexdigest()
    
    def add_data(self, data, identifier=None):
        """Add data, returning True if it's new, False if duplicate"""
        content_hash = self._compute_hash(data)
        
        if content_hash in self.content_hashes:
            self.duplicate_count += 1
            return False  # Duplicate
        
        self.content_hashes.add(content_hash)
        self.hash_to_content[content_hash] = {
            'data': data,
            'identifier': identifier,
            'first_seen': time.time()
        }
        return True  # New data
    
    def get_unique_data(self):
        """Return all unique data items"""
        return [item['data'] for item in self.hash_to_content.values()]
    
    def stats(self):
        return {
            'unique_items': len(self.content_hashes),
            'duplicates_found': self.duplicate_count,
            'deduplication_ratio': f"{self.duplicate_count / (len(self.content_hashes) + self.duplicate_count):.2%}" if (len(self.content_hashes) + self.duplicate_count) > 0 else "0%"
        }

# Example: Email deduplication system
class EmailDeduplicator:
    def __init__(self):
        self.deduplicator = DataDeduplicator()
        self.emails_processed = 0
    
    def process_email(self, email_data):
        """Process an email, detecting duplicates"""
        self.emails_processed += 1
        
        # Create a normalized version for deduplication
        normalized_email = {
            'from': email_data.get('from', '').lower().strip(),
            'to': [addr.lower().strip() for addr in email_data.get('to', [])],
            'subject': email_data.get('subject', '').strip(),
            'body': email_data.get('body', '').strip(),
            # Exclude timestamp and message_id from deduplication
        }
        
        is_unique = self.deduplicator.add_data(
            normalized_email, 
            identifier=email_data.get('message_id')
        )
        
        if is_unique:
            print(f"✅ New email: {email_data.get('subject', 'No subject')}")
            return True
        else:
            print(f"🔄 Duplicate email: {email_data.get('subject', 'No subject')}")
            return False

# Test email deduplication
email_dedup = EmailDeduplicator()

emails = [
    {
        'message_id': '1',
        'from': 'maya@coffee.com',
        'to': ['team@company.com'],
        'subject': 'Weekly Coffee Meeting',
        'body': 'Let\'s discuss our coffee strategy.',
        'timestamp': '2024-01-01 10:00:00'
    },
    {
        'message_id': '2',
        'from': 'Maya@Coffee.com',  # Different case
        'to': ['team@company.com'],
        'subject': 'Weekly Coffee Meeting',
        'body': 'Let\'s discuss our coffee strategy.',
        'timestamp': '2024-01-01 10:05:00'  # Different timestamp
    },
    {
        'message_id': '3',
        'from': 'alice@company.com',
        'to': ['maya@coffee.com'],
        'subject': 'Re: Coffee Meeting',
        'body': 'Sounds great!',
        'timestamp': '2024-01-01 11:00:00'
    },
    {
        'message_id': '4',
        'from': 'maya@coffee.com',
        'to': ['team@company.com'],
        'subject': 'Weekly Coffee Meeting',
        'body': 'Let\'s discuss our coffee strategy.',
        'timestamp': '2024-01-02 10:00:00'  # Same content, different day
    }
]

print("Processing emails for deduplication:")
for email in emails:
    email_dedup.process_email(email)

print(f"\nDeduplication stats: {email_dedup.deduplicator.stats()}")

Advanced Hash Table Optimizations

Robin Hood Hashing

class RobinHoodHashTable:
    """Hash table using Robin Hood hashing for better performance"""
    
    def __init__(self, initial_capacity=8):
        self.capacity = initial_capacity
        self.size = 0
        self.keys = [None] * self.capacity
        self.values = [None] * self.capacity
        self.distances = [0] * self.capacity  # Distance from ideal position
    
    def _hash(self, key):
        return hash(key) % self.capacity
    
    def _probe_distance(self, index, key):
        """Calculate how far a key is from its ideal position"""
        ideal_index = self._hash(key)
        if index >= ideal_index:
            return index - ideal_index
        else:
            return self.capacity - ideal_index + index
    
    def put(self, key, value):
        if self.size >= self.capacity * 0.9:  # Resize at 90% capacity
            self._resize()
        
        index = self._hash(key)
        distance = 0
        
        while True:
            if self.keys[index] is None:
                # Empty slot - insert here
                self.keys[index] = key
                self.values[index] = value
                self.distances[index] = distance
                self.size += 1
                return
            
            if self.keys[index] == key:
                # Update existing key
                self.values[index] = value
                return
            
            # Robin Hood: if current entry has traveled less distance,
            # swap with it and continue inserting the displaced entry
            if distance > self.distances[index]:
                # Swap entries
                self.keys[index], key = key, self.keys[index]
                self.values[index], value = value, self.values[index]
                self.distances[index], distance = distance, self.distances[index]
            
            index = (index + 1) % self.capacity
            distance += 1
    
    def get(self, key):
        index = self._hash(key)
        distance = 0
        
        while self.keys[index] is not None:
            if self.keys[index] == key:
                return self.values[index]
            
            # If we've traveled further than the stored distance,
            # the key doesn't exist
            if distance > self.distances[index]:
                break
            
            index = (index + 1) % self.capacity
            distance += 1
        
        raise KeyError(f"Key '{key}' not found")
    
    def _resize(self):
        old_keys = self.keys
        old_values = self.values
        
        self.capacity *= 2
        self.size = 0
        self.keys = [None] * self.capacity
        self.values = [None] * self.capacity
        self.distances = [0] * self.capacity
        
        for i in range(len(old_keys)):
            if old_keys[i] is not None:
                self.put(old_keys[i], old_values[i])
    
    def probe_stats(self):
        """Analyze probe distances for performance insights"""
        used_slots = [i for i in range(self.capacity) if self.keys[i] is not None]
        distances = [self.distances[i] for i in used_slots]
        
        if not distances:
            return {'avg_distance': 0, 'max_distance': 0}
        
        return {
            'avg_distance': sum(distances) / len(distances),
            'max_distance': max(distances),
            'distance_distribution': distances
        }

Consistent Hashing for Distributed Systems

import bisect
import hashlib

class ConsistentHashRing:
    """Consistent hashing implementation for distributed systems"""
    
    def __init__(self, nodes=None, replicas=3):
        self.replicas = replicas
        self.ring = {}  # hash_value -> node
        self.sorted_hashes = []
        
        if nodes:
            for node in nodes:
                self.add_node(node)
    
    def _hash(self, key):
        """Generate hash value for a key"""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    
    def add_node(self, node):
        """Add a node to the hash ring"""
        for i in range(self.replicas):
            replica_key = f"{node}:{i}"
            hash_value = self._hash(replica_key)
            self.ring[hash_value] = node
            bisect.insort(self.sorted_hashes, hash_value)
    
    def remove_node(self, node):
        """Remove a node from the hash ring"""
        for i in range(self.replicas):
            replica_key = f"{node}:{i}"
            hash_value = self._hash(replica_key)
            if hash_value in self.ring:
                del self.ring[hash_value]
                self.sorted_hashes.remove(hash_value)
    
    def get_node(self, key):
        """Find which node should handle this key"""
        if not self.ring:
            return None
        
        hash_value = self._hash(key)
        
        # Find the first node hash >= our key hash
        index = bisect.bisect_right(self.sorted_hashes, hash_value)
        
        # If we're past the end, wrap around to the beginning
        if index == len(self.sorted_hashes):
            index = 0
        
        return self.ring[self.sorted_hashes[index]]
    
    def get_nodes(self, key, count=1):
        """Get multiple nodes for replication"""
        if not self.ring or count <= 0:
            return []
        
        hash_value = self._hash(key)
        index = bisect.bisect_right(self.sorted_hashes, hash_value)
        
        nodes = []
        seen_nodes = set()
        
        for _ in range(count):
            if index >= len(self.sorted_hashes):
                index = 0
            
            node = self.ring[self.sorted_hashes[index]]
            if node not in seen_nodes:
                nodes.append(node)
                seen_nodes.add(node)
            
            index += 1
            
            # Prevent infinite loop if we don't have enough unique nodes
            if len(seen_nodes) == len(set(self.ring.values())):
                break
        
        return nodes

# Example: Distributed cache system
class DistributedCache:
    def __init__(self, cache_nodes):
        self.hash_ring = ConsistentHashRing(cache_nodes)
        self.node_caches = {node: {} for node in cache_nodes}
        self.stats = {node: {'hits': 0, 'misses': 0} for node in cache_nodes}
    
    def put(self, key, value):
        """Store a key-value pair in the appropriate cache node"""
        node = self.hash_ring.get_node(key)
        if node:
            self.node_caches[node][key] = value
            print(f"Stored '{key}' on node {node}")
    
    def get(self, key):
        """Retrieve a value from the appropriate cache node"""
        node = self.hash_ring.get_node(key)
        if node and key in self.node_caches[node]:
            self.stats[node]['hits'] += 1
            print(f"Cache HIT for '{key}' on node {node}")
            return self.node_caches[node][key]
        
        if node:
            self.stats[node]['misses'] += 1
            print(f"Cache MISS for '{key}' on node {node}")
        return None
    
    def add_cache_node(self, node):
        """Add a new cache node and rebalance"""
        print(f"Adding new cache node: {node}")
        
        # Store current data
        all_data = {}
        for node_cache in self.node_caches.values():
            all_data.update(node_cache)
        
        # Add new node
        self.hash_ring.add_node(node)
        self.node_caches[node] = {}
        self.stats[node] = {'hits': 0, 'misses': 0}
        
        # Clear all caches and redistribute
        for cache in self.node_caches.values():
            cache.clear()
        
        # Redistribute all data
        for key, value in all_data.items():
            self.put(key, value)
    
    def get_cache_stats(self):
        """Get statistics for all cache nodes"""
        return {
            node: {
                'items': len(cache),
                'hits': self.stats[node]['hits'],
                'misses': self.stats[node]['misses'],
                'hit_rate': f"{self.stats[node]['hits'] / max(1, self.stats[node]['hits'] + self.stats[node]['misses']):.2%}"
            }
            for node, cache in self.node_caches.items()
        }

# Test distributed cache
cache_nodes = ['server1', 'server2', 'server3']
dist_cache = DistributedCache(cache_nodes)

# Add some data
cache_data = [
    ('user:1', {'name': 'Maya', 'city': 'Seattle'}),
    ('user:2', {'name': 'Alice', 'city': 'Portland'}),
    ('order:100', {'total': 45.50, 'items': 3}),
    ('product:coffee', {'price': 4.50, 'stock': 100}),
    ('session:abc123', {'user_id': 1, 'expires': '2024-01-01'})
]

print("Storing data in distributed cache:")
for key, value in cache_data:
    dist_cache.put(key, value)

print("\nRetrieving data:")
for key, _ in cache_data:
    result = dist_cache.get(key)

print(f"\nCache statistics before adding node:")
for node, stats in dist_cache.get_cache_stats().items():
    print(f"{node}: {stats}")

# Add a new cache node
dist_cache.add_cache_node('server4')

print(f"\nCache statistics after adding node:")
for node, stats in dist_cache.get_cache_stats().items():
    print(f"{node}: {stats}")

Final Thoughts: Hash Tables as the Foundation of Modern Computing

That 30-second-to-200-millisecond optimization at Amazon opened my eyes to the incredible power of choosing the right data structure. Hash tables aren't just an academic concept - they're the invisible foundation powering everything from database indexes to web caches to distributed systems.

Every time you use a Python dictionary, JavaScript object, or Java HashMap, you're leveraging decades of hash table optimization. Understanding how they work under the hood makes you a better developer, whether you're optimizing database queries, building caching systems, or designing distributed architectures.

The beauty of hash tables lies in their simplicity: transform any key into an array index, and you get O(1) lookups. The complexity comes in handling collisions gracefully, maintaining good performance as data grows, and adapting to different use cases.

Whether you're building a simple user preference system or a distributed cache spanning multiple continents, hash tables provide the performance characteristics that make modern applications possible. Master them, and you'll never look at performance problems the same way again.


Currently writing this from Analog Coffee in Capitol Hill, where I'm optimizing a hash-based deduplication system while enjoying my usual cortado. Share your hash table optimization stories @maya_codes_pnw - we've all had those performance breakthrough moments! ⚡☕

Share this article

Add Comment

No comments yet. Be the first to comment!

More from Programming