LEARN DATABASE SHARDING DEEP DIVE
Learn Database Sharding & Distributed Databases: From Single Node to Global Scale
Goal: Deeply understand how databases scale horizontally—from basic partitioning to building your own sharded database, understanding the tradeoffs that systems like CockroachDB, Vitess, and DynamoDB make.
Why Learn Database Sharding?
Every successful application eventually faces the same problem: a single database server can’t handle the load. At that point, you need to distribute data across multiple machines. Understanding sharding and distributed databases means:
- Knowing when and how to scale your database horizontally
- Understanding why distributed systems are hard (and what tradeoffs exist)
- Being able to design systems that handle millions of users
- Understanding how companies like Google, Amazon, and Uber build their infrastructure
After completing these projects, you will:
- Implement sharding strategies from scratch
- Understand CAP theorem, ACID, and BASE in practice
- Build distributed consensus and replication
- Know when to use which database for which workload
- Design data models that scale horizontally
Core Concept Analysis
The Scaling Journey
SCALING YOUR DATABASE
Stage 1: Single Server Stage 2: Read Replicas
┌─────────────┐ ┌─────────────┐
│ Primary │ │ Primary │◄── Writes
│ (R + W) │ │ (Writes) │
└─────────────┘ └──────┬──────┘
│ Replication
┌──────┴──────┐
▼ ▼
┌────────┐ ┌────────┐
│Replica │ │Replica │◄── Reads
└────────┘ └────────┘
Stage 3: Sharding Stage 4: Distributed Database
┌─────────┐ ┌─────────┐ ┌─────────────────────────────┐
│ Shard 1 │ │ Shard 2 │ │ Distributed SQL/NoSQL │
│ (A-M) │ │ (N-Z) │ │ (Automatic Sharding + │
└─────────┘ └─────────┘ │ Replication + Consensus) │
┌─────────┐ ┌─────────┐ └─────────────────────────────┘
│ Shard 3 │ │ Shard 4 │ CockroachDB, Spanner, TiDB,
│ (0-4) │ │ (5-9) │ Vitess, Citus, YugabyteDB
└─────────┘ └─────────┘
Fundamental Concepts
1. Partitioning vs Sharding
VERTICAL PARTITIONING HORIZONTAL PARTITIONING (SHARDING)
(Split by columns) (Split by rows)
┌──────────────────────┐ ┌──────────────────────┐
│ id│name│email│bio │ │ id│name│email│bio │
├──────────────────────┤ ├──────────────────────┤
│ 1 │John│j@..│Long... │ │ 1 │John│j@..│... │ ─┐
│ 2 │Jane│ja@.│Long... │ │ 2 │Jane│ja@.│... │ │ Shard 1
│ 3 │Bob │b@..│Long... │ ├──────────────────────┤ ─┘
└──────────────────────┘ │ 3 │Bob │b@..│... │ ─┐
│ │ 4 │Mary│m@..│... │ │ Shard 2
▼ └──────────────────────┘ ─┘
┌────────────┐ ┌───────────┐
│id│name│email│ │id│bio │
├────────────┤ ├───────────┤
│1 │John│j@..│ │1 │Long... │
│2 │Jane│ja@.│ │2 │Long... │
└────────────┘ └───────────┘
(Hot data) (Cold data)
2. Sharding Strategies
Hash-Based Sharding:
shard_id = hash(shard_key) % num_shards
User ID: 12345
hash(12345) = 0x7A3F...
0x7A3F % 4 = 2
→ Goes to Shard 2
- ✅ Even distribution
- ❌ Range queries require scatter-gather
- ❌ Resharding requires moving lots of data
Range-Based Sharding:
Shard 1: user_id 1 - 1,000,000
Shard 2: user_id 1,000,001 - 2,000,000
Shard 3: user_id 2,000,001 - 3,000,000
- ✅ Range queries are efficient
- ❌ Hot spots if recent data is accessed more
- ❌ Manual rebalancing needed
Directory-Based Sharding:
┌─────────────────────────────┐
│ Lookup Service │
│ user_123 → Shard 2 │
│ user_456 → Shard 1 │
│ user_789 → Shard 3 │
└─────────────────────────────┘
- ✅ Flexible placement
- ❌ Lookup service is a bottleneck
- ❌ Additional latency
Consistent Hashing:
┌──────────────┐
────►│ Node A │◄────
│ └──────────────┘ │
│ ▲ │
┌────┴────┐ │ ┌─────┴───┐
│ Key X │ Hash Ring │ Node C │
└─────────┘ │ └─────────┘
│ ▼ │
│ ┌──────────────┐ │
────►│ Node B │◄────
└──────────────┘
Keys are assigned to the next node clockwise on the ring.
Adding/removing a node only affects neighboring keys.
3. CAP Theorem
CONSISTENCY
▲
╱ ╲
╱ ╲
╱ ╲
╱ CA ╲
╱ (Single ╲
╱ Node) ╲
╱─────────────╲
╱ │ ╲
╱ CP │ AP ╲
╱ │ ╲
╱ (Mongo │ (Cass- ╲
╱ primary) │ andra) ╲
▼────────────┴────────────▼
PARTITION AVAILABILITY
TOLERANCE
In a distributed system, you must choose:
- CP: Consistency + Partition Tolerance (sacrifice Availability)
- AP: Availability + Partition Tolerance (sacrifice Consistency)
- CA: Only possible without network partitions (single node)
4. ACID vs BASE
| Property | ACID (Traditional) | BASE (Distributed) |
|---|---|---|
| A | Atomicity | Basically Available |
| C | Consistency | Soft state |
| I | Isolation | Eventually consistent |
| D | Durability | - |
| Use Case | Banks, transactions | Social media, analytics |
| Scaling | Vertical | Horizontal |
5. Replication Strategies
SINGLE-LEADER (Primary-Replica) MULTI-LEADER
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Primary │◄── All Writes │Leader 1 │◄─►│Leader 2 │
└────┬────┘ └────┬────┘ └────┬────┘
│ Async/Sync Replication │ │
┌────┴────┐ ┌─────────┐ ┌────┴────┐ ┌────┴────┐
│Replica 1│ │Replica 2│ │Replica │ │Replica │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
LEADERLESS (Dynamo-style)
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Node 1 │◄─►│ Node 2 │◄─►│ Node 3 │
└─────────┘ └─────────┘ └─────────┘
│ │ │
└─────────────┴─────────────┘
Quorum: W + R > N
(Write to W nodes, Read from R nodes)
6. Distributed Transactions
Two-Phase Commit (2PC):
Coordinator
│
Phase 1: Prepare
┌───────────┼───────────┐
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│Node 1 │ │Node 2 │ │Node 3 │
│ VOTE │ │ VOTE │ │ VOTE │
│ YES │ │ YES │ │ YES │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
└──────────┼──────────┘
│
Phase 2: Commit (if all YES)
┌───────────┼───────────┐
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│COMMIT │ │COMMIT │ │COMMIT │
└───────┘ └───────┘ └───────┘
Saga Pattern (for long-running transactions):
T1 ──► T2 ──► T3 ──► T4 ──► Success!
│ │ │ │
▼ ▼ ▼ ▼
C1 ◄── C2 ◄── C3 ◄── C4 (Compensating transactions if failure)
Prerequisites
Before starting these projects, you should have:
- SQL proficiency - Complex queries, joins, indexes, query plans
- Basic networking - TCP/IP, HTTP, request/response
- One programming language well - Python, Go, Java, or Rust recommended
- Docker basics - For running multiple database instances
- Basic data structures - Hash tables, trees, linked lists
Helpful but not required:
- Experience with PostgreSQL or MySQL administration
- Understanding of B-trees and LSM trees
- Basic distributed systems concepts
Development Environment Setup
Required Tools
# Docker for running databases
curl -fsSL https://get.docker.com | sh
# Database clients
sudo apt install postgresql-client mysql-client redis-tools
# Docker Compose for multi-container setups
sudo apt install docker-compose
# Programming language (choose one)
# Python
sudo apt install python3 python3-pip
pip3 install psycopg2-binary pymysql redis
# Go
wget https://go.dev/dl/go1.21.0.linux-amd64.tar.gz
sudo tar -C /usr/local -xzf go1.21.0.linux-amd64.tar.gz
# Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Quick Test
# Start a PostgreSQL container
docker run -d --name postgres -e POSTGRES_PASSWORD=secret -p 5432:5432 postgres
# Connect
psql -h localhost -U postgres
Project List
Projects progress from understanding fundamentals to building production-grade distributed systems.
Project 1: Query Router and Connection Pooler
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Python, Java
- Coolness Level: Level 3: Genuinely Clever
- Business Potential: 3. The “Service & Support” Model
- Difficulty: Level 2: Intermediate
- Knowledge Area: Database Proxying / Connection Management
- Software or Tool: PostgreSQL, TCP proxying
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A database proxy that sits between your application and PostgreSQL, routing queries to appropriate backends, pooling connections, and providing basic load balancing for read replicas.
Why it teaches sharding: Before you shard, you need to understand how applications talk to databases. A proxy is the foundation of all sharding solutions (Vitess, ProxySQL, PgBouncer). You’ll understand the wire protocol, connection lifecycle, and query routing.
Core challenges you’ll face:
- Parsing the PostgreSQL wire protocol → maps to understanding database communication
- Managing connection pools → maps to resource management in databases
- Routing reads vs writes → maps to replica routing strategies
- Handling failures gracefully → maps to high availability patterns
Key Concepts:
- PostgreSQL protocol: PostgreSQL documentation “Frontend/Backend Protocol”
- Connection pooling: PgBouncer documentation and design
- Read/write splitting: “High Performance MySQL” Chapter 11
- Health checks: Detecting failed backends
Difficulty: Intermediate Time estimate: 2 weeks Prerequisites: Network programming basics, SQL proficiency
Real world outcome:
$ ./dbproxy --config proxy.yaml &
[INFO] Listening on :5433
[INFO] Primary: localhost:5432 (healthy)
[INFO] Replica 1: localhost:5433 (healthy)
[INFO] Replica 2: localhost:5434 (healthy)
[INFO] Connection pool: 10 min, 100 max per backend
# Application connects to proxy
$ psql -h localhost -p 5433 -U app mydb
# Writes go to primary
mydb=> INSERT INTO users (name) VALUES ('Alice');
[PROXY] Routed INSERT to primary:5432
# Reads go to replicas (round-robin)
mydb=> SELECT * FROM users;
[PROXY] Routed SELECT to replica:5433
mydb=> SELECT * FROM users WHERE id = 1;
[PROXY] Routed SELECT to replica:5434
# Check proxy stats
$ curl localhost:8080/stats
{
"connections": {
"active": 5,
"idle": 45,
"waiting": 0
},
"queries": {
"total": 12456,
"reads": 10234,
"writes": 2222
},
"backends": {
"primary": {"healthy": true, "latency_ms": 2},
"replica1": {"healthy": true, "latency_ms": 1},
"replica2": {"healthy": true, "latency_ms": 1}
}
}
Implementation Hints:
PostgreSQL wire protocol basics:
Startup: Client sends StartupMessage with user, database
Auth: Server sends AuthenticationOk (or challenge)
Query: Client sends Query message, Server sends RowDescription + DataRow + CommandComplete
Close: Client sends Terminate
Message format:
┌───────┬────────────┬─────────────┐
│ Type │ Length │ Payload │
│ 1 byte│ 4 bytes │ variable │
└───────┴────────────┴─────────────┘
Query message: 'Q' + length + query_string + '\0'
Pseudo-code for connection pooling:
pool = {
available: [], // Idle connections
in_use: [], // Active connections
waiting: [], // Clients waiting for connection
}
func get_connection():
if available.len() > 0:
return available.pop()
elif in_use.len() < max_connections:
return create_new_connection()
else:
wait_for_available()
func release_connection(conn):
if waiting.len() > 0:
waiting.pop().give(conn)
else:
available.push(conn)
Read/write detection:
func isWriteQuery(query string) bool {
query = strings.ToUpper(strings.TrimSpace(query))
return strings.HasPrefix(query, "INSERT") ||
strings.HasPrefix(query, "UPDATE") ||
strings.HasPrefix(query, "DELETE") ||
strings.HasPrefix(query, "CREATE") ||
strings.HasPrefix(query, "ALTER") ||
strings.HasPrefix(query, "DROP")
}
Questions to guide implementation:
- How do you handle transactions that mix reads and writes?
- What happens when a backend becomes unhealthy mid-query?
- How do you detect a “stuck” connection that should be recycled?
- How does connection pooling differ from multiplexing?
Learning milestones:
- Proxy forwards simple queries → You understand the wire protocol
- Connection pooling reduces connection count → You understand resource management
- Reads go to replicas → You understand read/write splitting
- Failover works → You understand health checking
Project 2: Implement Consistent Hashing
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Python, Java
- Coolness Level: Level 4: Hardcore Tech Flex
- Business Potential: 1. The “Resume Gold”
- Difficulty: Level 2: Intermediate
- Knowledge Area: Distributed Algorithms / Hashing
- Software or Tool: Hash functions, ring data structure
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A consistent hashing library with virtual nodes that can map keys to nodes, handle node additions/removals with minimal key redistribution, and provide load balancing statistics.
Why it teaches sharding: Consistent hashing is the foundation of how distributed caches (Memcached), databases (DynamoDB, Cassandra), and CDNs distribute data. Understanding it is essential for designing any distributed system.
Core challenges you’ll face:
- Implementing the hash ring → maps to understanding distributed key placement
- Adding virtual nodes → maps to solving load imbalance
- Handling node changes → maps to minimal data movement
- Measuring distribution quality → maps to load balancing metrics
Key Concepts:
- Consistent hashing: Original paper by Karger et al. (1997)
- Virtual nodes: Spreading each physical node across the ring
- Hash functions: MD5, SHA-1, xxHash for distribution
- Load factor: Measuring how evenly keys are distributed
Difficulty: Intermediate Time estimate: 1 week Prerequisites: Understanding of hash tables, basic data structures
Real world outcome:
$ ./consistent_hash demo
Consistent Hashing Demo
=======================
Adding nodes: A, B, C (100 virtual nodes each)
Ring visualization (simplified):
0°────────90°────────180°────────270°────────360°
│ A │ B │ C │ A │ B │
│ **** │ **** │ **** │ **** │ **** │
Distributing 10,000 keys...
Node A: 3,342 keys (33.4%)
Node B: 3,301 keys (33.0%)
Node C: 3,357 keys (33.6%)
Standard deviation: 0.3%
Adding node D...
Keys moved: 2,456 (24.6%) - only from neighbors!
Node A: 2,512 keys (25.1%)
Node B: 2,489 keys (24.9%)
Node C: 2,543 keys (25.4%)
Node D: 2,456 keys (24.6%)
Removing node B...
Keys moved: 2,489 (to A, C, D only)
Node A: 3,341 keys (33.4%)
Node C: 3,318 keys (33.2%)
Node D: 3,341 keys (33.4%)
$ ./consistent_hash lookup --key "user:12345"
Key: user:12345
Hash: 0x7A3F2B1C
Position on ring: 214.7°
Primary node: C
Replica nodes: A, D (next 2 clockwise)
Implementation Hints:
Hash ring data structure:
type HashRing struct {
ring []uint64 // Sorted positions on ring
nodeMap map[uint64]string // Position -> node name
nodes map[string]int // Node name -> virtual node count
replicas int // Virtual nodes per physical node
}
func (h *HashRing) AddNode(name string) {
for i := 0; i < h.replicas; i++ {
key := fmt.Sprintf("%s-%d", name, i)
hash := hashKey(key)
h.ring = append(h.ring, hash)
h.nodeMap[hash] = name
}
sort.Slice(h.ring, func(i, j int) bool {
return h.ring[i] < h.ring[j]
})
}
func (h *HashRing) GetNode(key string) string {
hash := hashKey(key)
// Binary search for first position >= hash
idx := sort.Search(len(h.ring), func(i int) bool {
return h.ring[i] >= hash
})
if idx == len(h.ring) {
idx = 0 // Wrap around
}
return h.nodeMap[h.ring[idx]]
}
Getting N replicas (for replication):
func (h *HashRing) GetNodes(key string, n int) []string {
hash := hashKey(key)
idx := sort.Search(len(h.ring), func(i int) bool {
return h.ring[i] >= hash
})
result := make([]string, 0, n)
seen := make(map[string]bool)
for len(result) < n && len(seen) < len(h.nodes) {
node := h.nodeMap[h.ring[idx % len(h.ring)]]
if !seen[node] {
seen[node] = true
result = append(result, node)
}
idx++
}
return result
}
Questions to guide implementation:
- Why do we use virtual nodes instead of just one position per physical node?
- How does the number of virtual nodes affect distribution quality?
- What happens if two virtual nodes hash to the same position?
- How would you implement “weighted” consistent hashing?
Learning milestones:
- Basic ring works → You understand the algorithm
- Virtual nodes improve distribution → You understand load balancing
- Node changes move minimal keys → You understand the key benefit
- You can get N replicas → You understand replication placement
Project 3: Application-Level Sharding (Manual Sharding)
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Python
- Alternative Programming Languages: Go, Java, Node.js
- Coolness Level: Level 3: Genuinely Clever
- Business Potential: 2. The “Micro-SaaS / Pro Tool”
- Difficulty: Level 2: Intermediate
- Knowledge Area: Sharding Strategies / Data Modeling
- Software or Tool: PostgreSQL, application routing
- Main Book: “High Performance MySQL” by Baron Schwartz
What you’ll build: A Python library that implements application-level sharding for a user database—routing queries to the correct shard based on user_id, handling cross-shard queries, and providing a migration path for resharding.
Why it teaches sharding: Before using tools like Vitess or Citus, many companies implement sharding in their application layer. Understanding manual sharding teaches you the fundamental problems that these tools solve, and helps you make better decisions about when and how to shard.
Core challenges you’ll face:
- Choosing and implementing a shard key → maps to data distribution decisions
- Routing queries to correct shard → maps to query routing logic
- Handling cross-shard queries → maps to the scatter-gather pattern
- Resharding without downtime → maps to online data migration
Key Concepts:
- Shard key selection: “Designing Data-Intensive Applications” Chapter 6
- Cross-shard queries: Fan-out and aggregation
- Resharding: Double-write and backfill strategies
- Shard-local vs global data: What to shard and what not to
Difficulty: Intermediate Time estimate: 2 weeks Prerequisites: Project 2 completed, SQL proficiency, Python
Real world outcome:
# Initialize sharded database
from shardlib import ShardedDB
db = ShardedDB(
shards=[
{"host": "shard1.db.local", "range": (0, 1000000)},
{"host": "shard2.db.local", "range": (1000001, 2000000)},
{"host": "shard3.db.local", "range": (2000001, 3000000)},
],
shard_key="user_id"
)
# Simple query - routed to correct shard
user = db.query(
"SELECT * FROM users WHERE user_id = %s",
[12345]
)
# [LOG] Routed to shard1 (user_id 12345 in range 0-1000000)
# Insert - routed by shard key
db.execute(
"INSERT INTO orders (user_id, product, amount) VALUES (%s, %s, %s)",
[12345, "Widget", 99.99]
)
# [LOG] Routed to shard1
# Cross-shard query - scatter-gather
top_users = db.query_all_shards(
"SELECT user_id, SUM(amount) as total FROM orders GROUP BY user_id ORDER BY total DESC LIMIT 10"
)
# [LOG] Scatter to 3 shards, gather and merge results
# Returns combined top 10 across all shards
# Resharding (add a new shard)
db.add_shard({"host": "shard4.db.local", "range": (3000001, 4000000)})
db.reshard(strategy="double-write")
# [LOG] Enabling double-write to old and new shards
# [LOG] Backfilling data from shard3 to shard4
# [LOG] Verifying consistency
# [LOG] Switching reads to new shard
# [LOG] Disabling double-write
$ python -m shardlib stats
Shard Statistics
================
Shard 1 (shard1.db.local):
Users: 342,156
Orders: 1,234,567
Size: 12.3 GB
QPS: 1,234
Shard 2 (shard2.db.local):
Users: 338,901
Orders: 1,198,234
Size: 11.8 GB
QPS: 1,156
Shard 3 (shard3.db.local):
Users: 345,678
Orders: 1,267,890
Size: 12.7 GB
QPS: 1,289
Distribution quality: Good (max deviation: 2.1%)
Implementation Hints:
Shard routing:
class ShardRouter:
def __init__(self, shards, shard_key):
self.shards = shards
self.shard_key = shard_key
def get_shard(self, shard_key_value):
for shard in self.shards:
if shard['range'][0] <= shard_key_value <= shard['range'][1]:
return shard
raise ValueError(f"No shard for key {shard_key_value}")
def extract_shard_key(self, query, params):
# Parse query to find WHERE user_id = ?
# Return the shard key value from params
pass
Cross-shard query (scatter-gather):
def query_all_shards(self, query, merge_func=None):
results = []
for shard in self.shards:
conn = self.get_connection(shard)
shard_results = conn.execute(query).fetchall()
results.extend(shard_results)
if merge_func:
return merge_func(results)
return results
# For aggregations like SUM, COUNT
def merge_sum(results):
return sum(r[0] for r in results)
# For ORDER BY ... LIMIT
def merge_top_n(results, n, key_func):
return sorted(results, key=key_func, reverse=True)[:n]
Double-write resharding:
def reshard_double_write(self, old_shard, new_shard):
# Phase 1: Enable double-write
self.double_write_enabled = True
self.double_write_targets = [old_shard, new_shard]
# Phase 2: Backfill existing data
cursor = old_shard.execute("SELECT * FROM users WHERE user_id > %s",
[new_shard['range'][0]])
for row in cursor:
new_shard.execute("INSERT INTO users ...", row)
# Phase 3: Verify consistency
# Compare checksums or row counts
# Phase 4: Switch reads
self.update_routing(new_shard)
# Phase 5: Disable double-write
self.double_write_enabled = False
Questions to guide implementation:
- What happens if a query doesn’t include the shard key?
- How do you handle JOINs between tables with different shard keys?
- What’s the difference between logical and physical shards?
- How do you maintain referential integrity across shards?
Learning milestones:
- Simple queries route correctly → You understand shard routing
- Cross-shard queries work → You understand scatter-gather
- Resharding works without data loss → You understand online migration
- Your library handles edge cases → You understand production concerns
Project 4: Build a Key-Value Store with Replication
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Java
- Coolness Level: Level 4: Hardcore Tech Flex
- Business Potential: 2. The “Micro-SaaS / Pro Tool”
- Difficulty: Level 3: Advanced
- Knowledge Area: Distributed Storage / Replication
- Software or Tool: Custom storage engine, replication protocol
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A distributed key-value store with primary-replica replication, supporting both synchronous and asynchronous replication modes, with automatic failover.
Why it teaches sharding: Replication is the other half of distributed databases (sharding + replication). Understanding how data is replicated, how to handle failures, and the consistency tradeoffs prepares you for any distributed database work.
Core challenges you’ll face:
- Implementing the replication log → maps to write-ahead logging
- Synchronous vs asynchronous replication → maps to consistency vs latency tradeoffs
- Handling replica lag → maps to eventual consistency
- Implementing failover → maps to high availability
Key Concepts:
- Replication log: “Designing Data-Intensive Applications” Chapter 5
- Leader election: Raft or simple heartbeat-based
- Consistency levels: Strong, eventual, read-your-writes
- Failover strategies: Automatic vs manual promotion
Difficulty: Advanced Time estimate: 3 weeks Prerequisites: Projects 1-2 completed, understanding of networking
Real world outcome:
$ ./kvstore --mode primary --port 6000 --replicas "localhost:6001,localhost:6002"
[INFO] Starting as PRIMARY on :6000
[INFO] Replica localhost:6001 connected
[INFO] Replica localhost:6002 connected
[INFO] Replication mode: async (configurable per-write)
# On another terminal
$ ./kvstore --mode replica --port 6001 --primary localhost:6000
[INFO] Starting as REPLICA on :6001
[INFO] Connected to primary localhost:6000
[INFO] Replication lag: 0 ops
# Client operations
$ ./kvcli -h localhost:6000
kv> SET user:123 '{"name": "Alice", "email": "alice@example.com"}'
OK (replicated to 2/2 replicas)
kv> GET user:123
{"name": "Alice", "email": "alice@example.com"}
kv> SET important:data '{"critical": true}' --sync
OK (synchronously replicated to 2/2 replicas)
# Check replication status
kv> STATUS
Primary: localhost:6000
Keys: 1,234
Replication log position: 5678
Replicas:
localhost:6001: lag=0 ops, healthy
localhost:6002: lag=2 ops, healthy
# Simulate primary failure
$ kill -9 $(pgrep -f "kvstore.*primary")
# Replica detects failure and one becomes primary
[REPLICA 6001] Primary heartbeat timeout
[REPLICA 6001] Starting election...
[REPLICA 6001] Elected as new PRIMARY
[REPLICA 6002] Acknowledged new primary: localhost:6001
$ ./kvcli -h localhost:6001
kv> GET user:123
{"name": "Alice", "email": "alice@example.com"}
kv> SET user:456 '{"name": "Bob"}'
OK (replicated to 1/1 replicas)
Implementation Hints:
Replication log structure:
type LogEntry struct {
Sequence uint64
Timestamp time.Time
Operation string // SET, DELETE
Key string
Value []byte
}
type ReplicationLog struct {
entries []LogEntry
lastSeq uint64
mu sync.RWMutex
}
func (r *ReplicationLog) Append(op, key string, value []byte) uint64 {
r.mu.Lock()
defer r.mu.Unlock()
r.lastSeq++
entry := LogEntry{
Sequence: r.lastSeq,
Timestamp: time.Now(),
Operation: op,
Key: key,
Value: value,
}
r.entries = append(r.entries, entry)
return r.lastSeq
}
func (r *ReplicationLog) GetEntriesSince(seq uint64) []LogEntry {
r.mu.RLock()
defer r.mu.RUnlock()
// Find entries after seq
for i, e := range r.entries {
if e.Sequence > seq {
return r.entries[i:]
}
}
return nil
}
Replica synchronization:
func (replica *Replica) syncLoop() {
for {
entries := replica.primary.GetEntriesSince(replica.lastSeq)
for _, entry := range entries {
replica.apply(entry)
replica.lastSeq = entry.Sequence
}
time.Sleep(10 * time.Millisecond) // Or use streaming
}
}
Synchronous replication:
func (primary *Primary) SetSync(key string, value []byte) error {
// Write locally
seq := primary.log.Append("SET", key, value)
primary.store.Set(key, value)
// Wait for all replicas
var wg sync.WaitGroup
errors := make(chan error, len(primary.replicas))
for _, replica := range primary.replicas {
wg.Add(1)
go func(r *ReplicaConn) {
defer wg.Done()
if err := r.WaitForSeq(seq, 5*time.Second); err != nil {
errors <- err
}
}(replica)
}
wg.Wait()
close(errors)
for err := range errors {
if err != nil {
return err // Or handle partial success
}
}
return nil
}
Questions to guide implementation:
- What happens if a replica is down during a synchronous write?
- How do you handle a “split-brain” scenario?
- What’s the tradeoff between replication lag and throughput?
- How do you handle a replica that’s too far behind to catch up?
Learning milestones:
- Async replication works → You understand log-based replication
- Sync replication blocks until confirmed → You understand consistency guarantees
- Failover promotes a replica → You understand leader election
- Clients reconnect to new primary → You understand high availability
Project 5: Implement Distributed Transactions (Two-Phase Commit)
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Java
- Coolness Level: Level 5: Pure Magic
- Business Potential: 3. The “Service & Support” Model
- Difficulty: Level 4: Expert
- Knowledge Area: Distributed Transactions / Consensus
- Software or Tool: Custom transaction coordinator
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A transaction coordinator that implements Two-Phase Commit (2PC) across multiple database shards, handling prepare/commit/abort phases, timeouts, and recovery.
Why it teaches sharding: Cross-shard transactions are one of the hardest problems in distributed databases. Understanding 2PC teaches you why distributed transactions are expensive, what can go wrong, and why many systems avoid them.
Core challenges you’ll face:
- Implementing the 2PC protocol → maps to atomic commit across nodes
- Handling coordinator failure → maps to the coordinator as single point of failure
- Implementing participant recovery → maps to crash recovery
- Dealing with blocking → maps to why 2PC is problematic
Key Concepts:
- Two-Phase Commit: “Designing Data-Intensive Applications” Chapter 9
- Prepare/Commit/Abort phases: The protocol states
- Transaction log: For crash recovery
- Heuristic decisions: What participants do when coordinator dies
Difficulty: Expert Time estimate: 3 weeks Prerequisites: Projects 3-4 completed
Real world outcome:
$ ./txcoordinator --shards "shard1:5432,shard2:5432,shard3:5432"
[INFO] Transaction Coordinator started
[INFO] Connected to 3 shards
# Client initiates a distributed transaction
$ ./txclient
tx> BEGIN
Transaction ID: tx-12345
tx> UPDATE shard1.accounts SET balance = balance - 100 WHERE user_id = 1
[tx-12345] Prepared on shard1
tx> UPDATE shard2.accounts SET balance = balance + 100 WHERE user_id = 2
[tx-12345] Prepared on shard2
tx> COMMIT
[tx-12345] Phase 1: PREPARE
shard1: VOTE YES
shard2: VOTE YES
shard3: VOTE YES (no-op, not involved)
[tx-12345] Phase 2: COMMIT
shard1: COMMITTED
shard2: COMMITTED
[tx-12345] Transaction committed successfully
# View transaction log
$ ./txcoordinator --show-log
Transaction Log
===============
tx-12345: COMMITTED
Participants: shard1, shard2
Prepare time: 2024-01-15 10:30:00.123
Commit time: 2024-01-15 10:30:00.145
Duration: 22ms
# Simulate failure during commit
tx> BEGIN
Transaction ID: tx-12346
tx> UPDATE shard1.accounts SET balance = balance - 50 WHERE user_id = 1
tx> UPDATE shard2.accounts SET balance = balance + 50 WHERE user_id = 2
tx> COMMIT
[tx-12346] Phase 1: PREPARE
shard1: VOTE YES
shard2: VOTE NO (constraint violation)
[tx-12346] ABORTING (participant voted NO)
shard1: ROLLBACK
shard2: ROLLBACK
[tx-12346] Transaction aborted
# Recovery after coordinator crash
$ ./txcoordinator --recover
[INFO] Recovering from transaction log...
[INFO] Found 1 in-doubt transaction: tx-12347
[INFO] tx-12347 was in PREPARED state, querying participants...
shard1: PREPARED (waiting)
shard2: PREPARED (waiting)
[INFO] All participants prepared, completing COMMIT
[INFO] Recovery complete
Implementation Hints:
Transaction states:
type TxState int
const (
TxStarted TxState = iota
TxPreparing
TxPrepared
TxCommitting
TxCommitted
TxAborting
TxAborted
)
type Transaction struct {
ID string
State TxState
Participants []string
Operations []Operation
PrepareTime time.Time
CommitTime time.Time
}
Coordinator 2PC logic:
func (c *Coordinator) Commit(tx *Transaction) error {
// Phase 1: Prepare
tx.State = TxPreparing
c.log.Write(tx) // Persist before sending
votes := make(map[string]bool)
for _, p := range tx.Participants {
vote, err := c.sendPrepare(p, tx.ID)
if err != nil || !vote {
return c.abort(tx)
}
votes[p] = vote
}
// All voted YES
tx.State = TxPrepared
c.log.Write(tx)
// Phase 2: Commit
tx.State = TxCommitting
c.log.Write(tx)
for _, p := range tx.Participants {
c.sendCommit(p, tx.ID) // Retry until success
}
tx.State = TxCommitted
tx.CommitTime = time.Now()
c.log.Write(tx)
return nil
}
Participant logic:
func (p *Participant) Prepare(txID string, ops []Operation) bool {
// Acquire locks
for _, op := range ops {
if !p.lockManager.TryLock(op.Key, txID) {
return false // Vote NO
}
}
// Write to WAL
p.wal.Write(txID, "PREPARED", ops)
// Execute but don't commit
for _, op := range ops {
p.executeWithoutCommit(op)
}
return true // Vote YES
}
func (p *Participant) Commit(txID string) {
p.wal.Write(txID, "COMMITTED")
p.commitPending(txID)
p.lockManager.ReleaseLocks(txID)
}
Questions to guide implementation:
- What happens if the coordinator crashes after sending PREPARE but before receiving all votes?
- What happens if a participant crashes after voting YES but before receiving COMMIT?
- Why is 2PC called a “blocking” protocol?
- How does 3PC (Three-Phase Commit) attempt to solve the blocking problem?
Learning milestones:
- Happy path works → You understand the basic protocol
- Participant failure causes abort → You understand voting
- Coordinator recovery works → You understand crash recovery
- You understand the limitations → You know when NOT to use 2PC
Project 6: Build a Sharded Database with Automatic Rebalancing
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Java
- Coolness Level: Level 5: Pure Magic
- Business Potential: 4. The “Open Core” Infrastructure
- Difficulty: Level 4: Expert
- Knowledge Area: Sharding / Data Migration
- Software or Tool: Custom sharded database
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A sharded key-value database that automatically detects hot shards and rebalances data, including live migration of data between shards without downtime.
Why it teaches sharding: Automatic rebalancing is what separates toy sharding implementations from production systems. DynamoDB, CockroachDB, and TiDB all do this. Understanding live data migration teaches you about the hardest operational challenges in distributed databases.
Core challenges you’ll face:
- Detecting hot shards → maps to monitoring and metrics
- Splitting shards → maps to range-based partitioning
- Live data migration → maps to online schema changes
- Maintaining consistency during migration → maps to double-write patterns
Key Concepts:
- Shard splitting: Dividing a range in half
- Live migration: Moving data while serving traffic
- Consistent hashing rebalancing: Adding virtual nodes
- Backpressure: Slowing migration to not impact production
Difficulty: Expert Time estimate: 4 weeks Prerequisites: Projects 2-5 completed
Real world outcome:
$ ./sharddb --init --shards 3
[INFO] Initializing sharded database with 3 shards
[INFO] Shard 1: range [0x0000, 0x5555)
[INFO] Shard 2: range [0x5555, 0xAAAA)
[INFO] Shard 3: range [0xAAAA, 0xFFFF)
# Load data
$ ./sharddb-load --keys 1000000
[INFO] Loading 1,000,000 keys...
[INFO] Distribution: Shard1=333,412 Shard2=332,876 Shard3=333,712
# Simulate hot shard (lots of traffic to one range)
$ ./sharddb-bench --hotspot --range "0x0000-0x2000" --qps 10000
[INFO] Generating hotspot traffic...
# Automatic detection and rebalancing
$ ./sharddb-admin status
Shard Status
============
Shard 1: [0x0000, 0x5555)
Keys: 333,412
QPS: 8,500 ⚠️ HOT
CPU: 85%
Rebalance: TRIGGERED
Shard 2: [0x5555, 0xAAAA)
Keys: 332,876
QPS: 750
CPU: 15%
Shard 3: [0xAAAA, 0xFFFF)
Keys: 333,712
QPS: 750
CPU: 15%
[INFO] Auto-rebalancing: Splitting Shard 1 at 0x2AAA
[INFO] Creating Shard 4 for range [0x2AAA, 0x5555)
[INFO] Migration started: 166,000 keys to move
Migration Progress
==================
Phase 1: Double-write enabled
New writes go to both Shard 1 and Shard 4
████████████████████████░░░░░░ 80%
Phase 2: Backfill
Copying existing keys...
████████████████░░░░░░░░░░░░░░ 55%
Keys moved: 91,300 / 166,000
Rate: 5,000 keys/sec (throttled to reduce impact)
Phase 3: Verification
Comparing checksums...
Phase 4: Cutover
Updating routing table...
Clients now read from Shard 4 for [0x2AAA, 0x5555)
Phase 5: Cleanup
Removing migrated keys from Shard 1...
[INFO] Rebalance complete!
$ ./sharddb-admin status
Shard 1: [0x0000, 0x2AAA) Keys: 167,412 QPS: 4,250 CPU: 45%
Shard 2: [0x5555, 0xAAAA) Keys: 332,876 QPS: 750 CPU: 15%
Shard 3: [0xAAAA, 0xFFFF) Keys: 333,712 QPS: 750 CPU: 15%
Shard 4: [0x2AAA, 0x5555) Keys: 166,000 QPS: 4,250 CPU: 45%
Implementation Hints:
Shard metadata:
type Shard struct {
ID string
RangeStart uint64
RangeEnd uint64
Node string
State ShardState // ACTIVE, SPLITTING, MIGRATING
}
type ShardManager struct {
shards []*Shard
routing *ConsistentHash // Or range-based lookup
migrations []*Migration
}
Hot shard detection:
type ShardMetrics struct {
QPS float64
P99Latency time.Duration
CPUUsage float64
KeyCount int64
}
func (sm *ShardManager) detectHotShards() []*Shard {
var hot []*Shard
avgQPS := sm.averageQPS()
for _, shard := range sm.shards {
metrics := sm.getMetrics(shard.ID)
if metrics.QPS > avgQPS * 2.0 || metrics.CPUUsage > 0.8 {
hot = append(hot, shard)
}
}
return hot
}
Live migration:
func (sm *ShardManager) splitShard(shard *Shard) error {
midpoint := (shard.RangeStart + shard.RangeEnd) / 2
// Create new shard
newShard := &Shard{
ID: uuid.New().String(),
RangeStart: midpoint,
RangeEnd: shard.RangeEnd,
State: MIGRATING,
}
// Phase 1: Enable double-write
sm.enableDoubleWrite(shard, newShard)
// Phase 2: Backfill existing data
cursor := shard.Scan(midpoint, shard.RangeEnd)
for cursor.Next() {
key, value := cursor.KeyValue()
newShard.Put(key, value)
// Throttle to limit impact
time.Sleep(time.Microsecond * 100)
}
// Phase 3: Verify
if !sm.verify(shard, newShard, midpoint) {
return errors.New("verification failed")
}
// Phase 4: Atomic cutover
sm.updateRouting(func(r *Routing) {
shard.RangeEnd = midpoint
r.AddShard(newShard)
})
// Phase 5: Cleanup
shard.DeleteRange(midpoint, shard.RangeEnd)
newShard.State = ACTIVE
sm.disableDoubleWrite(shard, newShard)
return nil
}
Questions to guide implementation:
- How do you handle writes to keys being migrated?
- What if the migration takes longer than expected?
- How do you handle a node failure during migration?
- When should you merge shards instead of splitting them?
Learning milestones:
- Hot shard detection works → You understand monitoring
- Manual split works → You understand the migration protocol
- Automatic rebalancing triggers → You understand policy-based operations
- Live migration doesn’t drop requests → You’ve built a production-grade feature
Project 7: Implement Raft Consensus for Leader Election
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust
- Coolness Level: Level 5: Pure Magic
- Business Potential: 4. The “Open Core” Infrastructure
- Difficulty: Level 5: Master
- Knowledge Area: Distributed Consensus / Raft
- Software or Tool: Raft implementation
- Main Book: “In Search of an Understandable Consensus Algorithm” (Raft paper)
What you’ll build: A Raft consensus implementation that can be used for leader election and replicated state machines, the foundation of distributed databases like CockroachDB and etcd.
Why it teaches sharding: Modern distributed databases use Raft (or Paxos) for replication within each shard. Understanding Raft means understanding how databases achieve strong consistency in a distributed environment.
Core challenges you’ll face:
- Implementing leader election → maps to distributed coordination
- Log replication → maps to consistent replication
- Handling network partitions → maps to split-brain prevention
- Log compaction (snapshots) → maps to memory management
Key Concepts:
- Raft paper: “In Search of an Understandable Consensus Algorithm” by Ongaro & Ousterhout
- Terms and elections: How leaders are chosen
- Log matching: Ensuring all nodes have the same log
- Safety guarantees: What Raft guarantees and what it doesn’t
Difficulty: Master Time estimate: 4-6 weeks Prerequisites: Projects 4-5 completed, deep understanding of distributed systems
Real world outcome:
$ ./raft-cluster --nodes 5
[Node1] Starting as FOLLOWER, term=0
[Node2] Starting as FOLLOWER, term=0
[Node3] Starting as FOLLOWER, term=0
[Node4] Starting as FOLLOWER, term=0
[Node5] Starting as FOLLOWER, term=0
# Election timeout on Node3
[Node3] Election timeout, becoming CANDIDATE, term=1
[Node3] Requesting votes...
[Node1] Granting vote to Node3 for term=1
[Node2] Granting vote to Node3 for term=1
[Node4] Granting vote to Node3 for term=1
[Node3] Received 4/5 votes, becoming LEADER for term=1
# Client submits a command
$ ./raft-client --leader node3:8000
raft> SET x = 10
[Node3] Appending to log: index=1, term=1, cmd="SET x = 10"
[Node3] Replicating to followers...
[Node1] Appended entry, log length=1
[Node2] Appended entry, log length=1
[Node4] Appended entry, log length=1
[Node3] Entry committed (3/5 replicas), applying to state machine
OK
raft> GET x
10
# Simulate leader failure
$ kill -9 $(pgrep -f "raft.*node3")
[Node1] Leader heartbeat timeout
[Node5] Leader heartbeat timeout
[Node2] Election timeout, becoming CANDIDATE, term=2
[Node1] Granting vote to Node2 for term=2
[Node4] Granting vote to Node2 for term=2
[Node5] Granting vote to Node2 for term=2
[Node2] Received 4/4 votes, becoming LEADER for term=2
# New leader has all committed entries
$ ./raft-client --leader node2:8000
raft> GET x
10
raft> SET y = 20
OK
# View cluster status
raft> STATUS
Cluster Status
==============
Leader: Node2 (term=2)
Node State Log Length Commit Index
---- ----- ---------- ------------
Node1 Follower 2 2
Node2 Leader 2 2
Node3 (down)
Node4 Follower 2 2
Node5 Follower 2 2
Implementation Hints:
Node state:
type NodeState int
const (
Follower NodeState = iota
Candidate
Leader
)
type RaftNode struct {
// Persistent state
currentTerm int
votedFor string
log []LogEntry
// Volatile state
commitIndex int
lastApplied int
// Leader state
nextIndex map[string]int
matchIndex map[string]int
state NodeState
leaderId string
}
Leader election:
func (n *RaftNode) startElection() {
n.currentTerm++
n.state = Candidate
n.votedFor = n.id
votes := 1 // Vote for self
for _, peer := range n.peers {
go func(p string) {
reply := n.sendRequestVote(p, &RequestVoteArgs{
Term: n.currentTerm,
CandidateId: n.id,
LastLogIndex: len(n.log) - 1,
LastLogTerm: n.log[len(n.log)-1].Term,
})
if reply.VoteGranted {
votes++
if votes > len(n.peers)/2 {
n.becomeLeader()
}
}
}(peer)
}
}
Log replication:
func (n *RaftNode) appendEntries(peer string) {
prevLogIndex := n.nextIndex[peer] - 1
entries := n.log[n.nextIndex[peer]:]
reply := n.sendAppendEntries(peer, &AppendEntriesArgs{
Term: n.currentTerm,
LeaderId: n.id,
PrevLogIndex: prevLogIndex,
PrevLogTerm: n.log[prevLogIndex].Term,
Entries: entries,
LeaderCommit: n.commitIndex,
})
if reply.Success {
n.matchIndex[peer] = prevLogIndex + len(entries)
n.nextIndex[peer] = n.matchIndex[peer] + 1
n.updateCommitIndex()
} else {
n.nextIndex[peer]-- // Backtrack and retry
}
}
Questions to guide implementation:
- Why does Raft use randomized election timeouts?
- What’s the “log matching property” and why is it important?
- How does Raft handle a “split vote”?
- Why can’t a leader commit entries from previous terms directly?
Learning milestones:
- Leader election works → You understand the election protocol
- Log replication works → You understand the AppendEntries RPC
- System survives leader failure → You understand fault tolerance
- You pass the Raft paper’s tests → You have a correct implementation
Project 8: Build a Distributed SQL Query Engine
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Java
- Coolness Level: Level 5: Pure Magic
- Business Potential: 4. The “Open Core” Infrastructure
- Difficulty: Level 5: Master
- Knowledge Area: Query Processing / Distributed SQL
- Software or Tool: SQL parser, query optimizer
- Main Book: “Database Internals” by Alex Petrov
What you’ll build: A distributed SQL query engine that can parse SQL, create distributed query plans, and execute queries across multiple shards with proper aggregation.
Why it teaches sharding: The hardest part of sharding isn’t data distribution—it’s making it transparent to applications. Distributed SQL engines like CockroachDB, Vitess, and Citus solve this. Understanding query distribution teaches you why these systems are valuable and how they work.
Core challenges you’ll face:
- Parsing and planning SQL → maps to query processing fundamentals
- Distributing queries to shards → maps to query routing
- Handling JOINs across shards → maps to distributed joins
- Aggregating results → maps to parallel aggregation
Key Concepts:
- Query planning: “Database Internals” Chapters on query processing
- Distributed joins: Hash join, broadcast join, collocated join
- Push-down optimization: Execute as much as possible on shards
- Result aggregation: Combining partial results correctly
Difficulty: Master Time estimate: 6+ weeks Prerequisites: All previous projects, SQL internals knowledge
Real world outcome:
$ ./distsql --shards "shard1:5432,shard2:5432,shard3:5432"
[INFO] Distributed SQL Engine started
[INFO] Connected to 3 shards
[INFO] Schema synchronized
distsql> EXPLAIN SELECT * FROM users WHERE user_id = 12345;
Query Plan
==========
→ SingleShardQuery
Shard: shard1 (user_id 12345 routes here)
Query: SELECT * FROM users WHERE user_id = 12345
distsql> EXPLAIN SELECT COUNT(*), AVG(age) FROM users;
Query Plan
==========
→ Aggregate (final)
Functions: SUM(partial_count), SUM(partial_sum)/SUM(partial_count)
├── ScatterGather
│ ├── Shard1: SELECT COUNT(*) as partial_count, SUM(age) as partial_sum, COUNT(age) as partial_count_age FROM users
│ ├── Shard2: SELECT COUNT(*) as partial_count, SUM(age) as partial_sum, COUNT(age) as partial_count_age FROM users
│ └── Shard3: SELECT COUNT(*) as partial_count, SUM(age) as partial_sum, COUNT(age) as partial_count_age FROM users
distsql> EXPLAIN SELECT u.name, COUNT(o.id) FROM users u JOIN orders o ON u.user_id = o.user_id GROUP BY u.user_id;
Query Plan
==========
→ Aggregate (final)
Group by: user_id
├── ScatterGather (collocated join - both tables sharded by user_id)
│ ├── Shard1: SELECT u.name, COUNT(o.id) FROM users u JOIN orders o ON u.user_id = o.user_id WHERE u.user_id BETWEEN 0 AND 1000000 GROUP BY u.user_id
│ ├── Shard2: ...
│ └── Shard3: ...
distsql> SELECT u.name, COUNT(o.id) as order_count
FROM users u
JOIN orders o ON u.user_id = o.user_id
GROUP BY u.user_id
ORDER BY order_count DESC
LIMIT 10;
[Executing on 3 shards...]
[Shard1] 2,345 rows scanned, 450 groups
[Shard2] 2,412 rows scanned, 467 groups
[Shard3] 2,301 rows scanned, 443 groups
[Merging results...]
name | order_count
--------------+-------------
John Smith | 156
Jane Doe | 142
Bob Wilson | 138
...
10 rows (45ms, 3 shards queried)
Implementation Hints:
Query plan representation:
type PlanNode interface {
Execute() ([]Row, error)
Children() []PlanNode
}
type ScatterGather struct {
shards []Shard
subquery string
children []PlanNode
}
type Aggregate struct {
functions []AggregateFunc
groupBy []string
child PlanNode
}
type SingleShard struct {
shard Shard
query string
}
Query routing based on shard key:
func (p *Planner) planQuery(query *ParsedQuery) PlanNode {
// Check if query has shard key in WHERE clause
shardKey := p.extractShardKey(query.Where)
if shardKey != nil {
// Route to single shard
shard := p.router.GetShard(shardKey)
return &SingleShard{shard: shard, query: query.String()}
}
// Need to scatter to all shards
return p.planScatterGather(query)
}
Distributed aggregation:
// For AVG, we need to compute SUM and COUNT on each shard,
// then combine: total_sum / total_count
func (p *Planner) rewriteForDistributedAgg(agg *Aggregate) *Aggregate {
var partialFuncs []AggregateFunc
var finalFuncs []AggregateFunc
for _, f := range agg.functions {
switch f.Name {
case "COUNT":
partialFuncs = append(partialFuncs, Count(f.Arg))
finalFuncs = append(finalFuncs, Sum("partial_count"))
case "SUM":
partialFuncs = append(partialFuncs, Sum(f.Arg))
finalFuncs = append(finalFuncs, Sum("partial_sum"))
case "AVG":
// Rewrite AVG(x) to SUM(x)/COUNT(x)
partialFuncs = append(partialFuncs, Sum(f.Arg), Count(f.Arg))
finalFuncs = append(finalFuncs, Divide(Sum("partial_sum"), Sum("partial_count")))
}
}
// ... create two-phase aggregate plan
}
Questions to guide implementation:
- How do you handle ORDER BY with LIMIT across shards?
- What’s the difference between a collocated join and a broadcast join?
- How do you handle subqueries that cross shards?
- When is it better to pull data to a coordinator vs. push computation to shards?
Learning milestones:
- Simple queries route correctly → You understand query routing
- Aggregations work across shards → You understand distributed aggregation
- Collocated joins work → You understand join optimization
- Query plans are efficient → You understand push-down optimization
Project 9: Multi-Region Database with Conflict Resolution
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust
- Coolness Level: Level 5: Pure Magic
- Business Potential: 5. The “Industry Disruptor”
- Difficulty: Level 5: Master
- Knowledge Area: Multi-Region / Conflict Resolution
- Software or Tool: CRDTs, vector clocks
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A multi-region database that allows writes in multiple regions simultaneously, using vector clocks for causality tracking and CRDTs for conflict-free data types.
Why it teaches sharding: Global applications need to write data close to users for low latency. But this creates conflicts. Understanding multi-region replication and conflict resolution teaches you how systems like DynamoDB and CockroachDB handle global scale.
Core challenges you’ll face:
- Implementing vector clocks → maps to causality tracking
- Detecting and resolving conflicts → maps to concurrent writes
- Implementing CRDTs → maps to conflict-free data types
- Cross-region replication → maps to WAN replication
Key Concepts:
- Vector clocks: “Designing Data-Intensive Applications” Chapter 5
- CRDTs: Conflict-free Replicated Data Types
- Last-writer-wins: Simple but lossy conflict resolution
- Causal consistency: Preserving happened-before relationships
Difficulty: Master Time estimate: 4-6 weeks Prerequisites: Projects 4-7 completed
Real world outcome:
# Start multi-region cluster
$ ./multiregion --region us-west --peers "eu-west:8000,ap-east:8000"
[us-west] Multi-region database started
[us-west] Connected to peers: eu-west, ap-east
# Concurrent writes in different regions
# Region: us-west
uswest> SET user:123:name "Alice"
OK (version: {us-west: 1})
# Simultaneously in eu-west
euwest> SET user:123:name "Alicia"
OK (version: {eu-west: 1})
# After replication...
uswest> GET user:123:name
CONFLICT DETECTED!
Version {us-west: 1}: "Alice"
Version {eu-west: 1}: "Alicia"
Resolution strategy: last-writer-wins
Winner: "Alicia" (eu-west timestamp later)
# Using CRDTs for conflict-free updates
uswest> COUNTER user:123:visits INCR 5
OK (version: {us-west: 1}, value: 5)
euwest> COUNTER user:123:visits INCR 3
OK (version: {eu-west: 1}, value: 3)
# After replication - no conflict!
uswest> COUNTER user:123:visits GET
8 (merged: 5 + 3)
# Using G-Set (grow-only set)
uswest> GSET user:123:tags ADD "premium"
OK
euwest> GSET user:123:tags ADD "verified"
OK
uswest> GSET user:123:tags MEMBERS
["premium", "verified"] (union of both additions)
# View causality
uswest> HISTORY user:123:name
Version History (vector clock)
==============================
{us-west: 0, eu-west: 0} → (initial)
{us-west: 1, eu-west: 0} → "Alice" (us-west)
{us-west: 0, eu-west: 1} → "Alicia" (eu-west) [concurrent!]
{us-west: 1, eu-west: 1} → "Alicia" (resolved)
Implementation Hints:
Vector clock:
type VectorClock map[string]uint64
func (vc VectorClock) Increment(nodeId string) {
vc[nodeId]++
}
func (vc VectorClock) Merge(other VectorClock) VectorClock {
result := make(VectorClock)
for k, v := range vc {
result[k] = v
}
for k, v := range other {
if v > result[k] {
result[k] = v
}
}
return result
}
func (vc VectorClock) Compare(other VectorClock) Ordering {
// Returns: Before, After, Equal, or Concurrent
}
G-Counter CRDT:
type GCounter struct {
counts map[string]uint64 // node -> count
}
func (g *GCounter) Increment(nodeId string, amount uint64) {
g.counts[nodeId] += amount
}
func (g *GCounter) Value() uint64 {
var total uint64
for _, v := range g.counts {
total += v
}
return total
}
func (g *GCounter) Merge(other *GCounter) {
for node, count := range other.counts {
if count > g.counts[node] {
g.counts[node] = count
}
}
}
Last-writer-wins register:
type LWWRegister struct {
value interface{}
timestamp time.Time
nodeId string
}
func (r *LWWRegister) Set(value interface{}, ts time.Time, node string) {
if ts.After(r.timestamp) || (ts.Equal(r.timestamp) && node > r.nodeId) {
r.value = value
r.timestamp = ts
r.nodeId = node
}
}
func (r *LWWRegister) Merge(other *LWWRegister) {
r.Set(other.value, other.timestamp, other.nodeId)
}
Questions to guide implementation:
- What’s the difference between vector clocks and Lamport timestamps?
- When is last-writer-wins acceptable, and when is it dangerous?
- How do CRDTs achieve eventual consistency without coordination?
- What are the limitations of CRDTs?
Learning milestones:
- Vector clocks track causality → You understand happens-before
- Conflicts are detected → You understand concurrent writes
- CRDTs merge correctly → You understand conflict-free types
- Multi-region replication works → You’ve built a globally distributed system
Project 10: Production-Ready Sharded Database (Capstone)
- File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust
- Coolness Level: Level 5: Pure Magic
- Business Potential: 5. The “Industry Disruptor”
- Difficulty: Level 5: Master
- Knowledge Area: Complete Distributed Database
- Software or Tool: All previous components
- Main Book: All previous books combined
What you’ll build: A complete, production-ready sharded database combining all previous projects: sharding with consistent hashing, replication with Raft, distributed transactions, automatic rebalancing, and a SQL query engine.
Why this is the capstone: This project integrates everything you’ve learned. It’s what companies like CockroachDB, TiDB, and YugabyteDB have built. Completing this means you truly understand distributed databases at a deep level.
Core challenges you’ll face:
- Integration complexity → maps to system design
- Operational features → maps to production readiness
- Performance optimization → maps to real-world constraints
- Testing distributed systems → maps to correctness verification
Prerequisites: All previous projects completed
Real world outcome:
$ ./proddb init --nodes 9 --replication-factor 3
[INFO] Initializing production database cluster
[INFO] 9 nodes, 3-way replication per shard
[INFO] Initial shards: 9 (3 Raft groups of 3 nodes each)
Cluster Topology
================
Shard 1 (range: 0x0000-0x5555)
├── Node1 (leader, us-east-1a)
├── Node2 (follower, us-east-1b)
└── Node3 (follower, us-east-1c)
Shard 2 (range: 0x5555-0xAAAA)
├── Node4 (leader, us-west-2a)
├── Node5 (follower, us-west-2b)
└── Node6 (follower, us-west-2c)
Shard 3 (range: 0xAAAA-0xFFFF)
├── Node7 (leader, eu-west-1a)
├── Node8 (follower, eu-west-1b)
└── Node9 (follower, eu-west-1c)
$ ./proddb sql
proddb> CREATE TABLE users (
id BIGINT PRIMARY KEY,
name TEXT,
email TEXT,
created_at TIMESTAMP
) SHARD BY HASH(id);
proddb> CREATE TABLE orders (
id BIGINT PRIMARY KEY,
user_id BIGINT REFERENCES users(id),
amount DECIMAL,
status TEXT
) SHARD BY HASH(user_id); -- Collocated with users
proddb> INSERT INTO users VALUES (1, 'Alice', 'alice@example.com', NOW());
[Routed to Shard 2, committed via Raft]
proddb> BEGIN;
proddb> UPDATE users SET name = 'Alicia' WHERE id = 1;
proddb> INSERT INTO orders VALUES (100, 1, 99.99, 'pending');
proddb> COMMIT;
[Distributed transaction across Shard 2 (both tables collocated)]
[2PC: Prepare OK, Commit OK]
proddb> SELECT u.name, SUM(o.amount)
FROM users u JOIN orders o ON u.id = o.user_id
GROUP BY u.id;
[Distributed query, collocated join, 3 shards queried]
[Results merged at coordinator]
# Admin commands
$ ./proddb admin rebalance --check
Shard Balance Analysis
======================
Shard 1: 2.1M keys, 45% CPU
Shard 2: 2.3M keys, 52% CPU ← slightly hot
Shard 3: 1.9M keys, 38% CPU
Recommendation: Split Shard 2
$ ./proddb admin rebalance --execute
[Starting automatic rebalancing...]
[Live migration in progress...]
[Rebalancing complete, 4 shards now]
$ ./proddb admin failover --simulate --node Node1
[Simulating Node1 failure...]
[Shard 1: Node2 elected as new leader (Raft term 5)]
[Cluster healthy, no data loss]
$ ./proddb benchmark --duration 60s --threads 100
Benchmark Results (60s, 100 threads)
====================================
Operations: 1,234,567
Throughput: 20,576 ops/sec
Latency P50: 2.3ms
Latency P99: 15.7ms
Latency P999: 45.2ms
Errors: 0
This capstone integrates:
- Consistent hashing (Project 2) for shard placement
- Query routing (Project 1) for connection management
- Application sharding (Project 3) for data distribution
- Replication (Project 4) for durability
- Distributed transactions (Project 5) for ACID
- Auto-rebalancing (Project 6) for operations
- Raft consensus (Project 7) for leader election
- Distributed SQL (Project 8) for query execution
Project Comparison Table
| Project | Difficulty | Time | Depth of Understanding | Fun Factor |
|---|---|---|---|---|
| 1. Query Router | Intermediate | 2 weeks | ⭐⭐⭐ | ⭐⭐⭐ |
| 2. Consistent Hashing | Intermediate | 1 week | ⭐⭐⭐⭐ | ⭐⭐⭐ |
| 3. Application Sharding | Intermediate | 2 weeks | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 4. KV Store with Replication | Advanced | 3 weeks | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 5. Two-Phase Commit | Expert | 3 weeks | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| 6. Auto-Rebalancing | Expert | 4 weeks | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 7. Raft Consensus | Master | 4-6 weeks | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 8. Distributed SQL Engine | Master | 6+ weeks | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 9. Multi-Region + CRDTs | Master | 4-6 weeks | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 10. Capstone | Master | 8+ weeks | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
Recommended Learning Path
Phase 1: Foundations (4-6 weeks)
- Project 1: Query Router - Understand database proxying
- Project 2: Consistent Hashing - Understand distributed key placement
- Project 3: Application Sharding - Implement manual sharding
Phase 2: Replication & Transactions (6-8 weeks)
- Project 4: Replicated KV Store - Understand replication
- Project 5: Two-Phase Commit - Understand distributed transactions
Phase 3: Advanced Operations (6-8 weeks)
- Project 6: Auto-Rebalancing - Understand live migration
- Project 7: Raft Consensus - Understand distributed consensus
Phase 4: Query Processing (6+ weeks)
- Project 8: Distributed SQL Engine - Understand query distribution
Phase 5: Global Scale (4-6 weeks)
- Project 9: Multi-Region with CRDTs - Understand global distribution
Phase 6: Integration (8+ weeks)
- Project 10: Production Database - Combine everything
Total estimated time: 9-12 months of focused learning
Essential Resources
Books (In Priority Order)
- “Designing Data-Intensive Applications” by Martin Kleppmann - THE book for this topic
- “Database Internals” by Alex Petrov - Deep dive into storage engines
- “Understanding Distributed Systems” by Roberto Vitillo - Practical introduction
- “High Performance MySQL” - Practical MySQL scaling
Papers
- Raft: “In Search of an Understandable Consensus Algorithm”
- Dynamo: “Dynamo: Amazon’s Highly Available Key-value Store”
- Spanner: “Spanner: Google’s Globally Distributed Database”
- CockroachDB: “CockroachDB: The Resilient Geo-Distributed SQL Database”
Online Resources
- The Raft Consensus Algorithm - Visualization
- Jepsen.io - Distributed systems testing
- Consistent Hashing Explained
- ByteByteGo - System design concepts
Tools to Study
- Vitess - MySQL sharding
- CockroachDB - Distributed SQL
- TiDB - Distributed MySQL-compatible
- etcd - Raft-based KV store
Summary
| # | Project | Main Language |
|---|---|---|
| 1 | Query Router and Connection Pooler | Go |
| 2 | Consistent Hashing Implementation | Go |
| 3 | Application-Level Sharding Library | Python |
| 4 | Key-Value Store with Replication | Go |
| 5 | Two-Phase Commit Coordinator | Go |
| 6 | Auto-Rebalancing Sharded Database | Go |
| 7 | Raft Consensus Implementation | Go |
| 8 | Distributed SQL Query Engine | Go |
| 9 | Multi-Region Database with CRDTs | Go |
| 10 | Production-Ready Sharded Database (Capstone) | Go |
Final Notes
Database sharding and distributed databases represent some of the hardest problems in computer science. The challenges include:
- Correctness: Distributed systems can fail in subtle ways
- Performance: Network latency dominates everything
- Operations: Running distributed systems is complex
- Tradeoffs: There’s no perfect solution, only tradeoffs
The journey from understanding CAP theorem to building a production database is transformative. You’ll understand why:
- Companies pay millions for CockroachDB and Spanner
- “Just add more shards” is never simple
- Distributed transactions are expensive
- Eventual consistency is often the right choice
Start with Project 1 and the book “Designing Data-Intensive Applications.” By the time you finish, you’ll understand distributed databases at a level few engineers achieve.
Happy distributing! 🗄️
Sources consulted: