← Back to all projects

LEARN DATABASE SHARDING DEEP DIVE

Learn Database Sharding & Distributed Databases: From Single Node to Global Scale

Goal: Deeply understand how databases scale horizontally—from basic partitioning to building your own sharded database, understanding the tradeoffs that systems like CockroachDB, Vitess, and DynamoDB make.


Why Learn Database Sharding?

Every successful application eventually faces the same problem: a single database server can’t handle the load. At that point, you need to distribute data across multiple machines. Understanding sharding and distributed databases means:

  • Knowing when and how to scale your database horizontally
  • Understanding why distributed systems are hard (and what tradeoffs exist)
  • Being able to design systems that handle millions of users
  • Understanding how companies like Google, Amazon, and Uber build their infrastructure

After completing these projects, you will:

  • Implement sharding strategies from scratch
  • Understand CAP theorem, ACID, and BASE in practice
  • Build distributed consensus and replication
  • Know when to use which database for which workload
  • Design data models that scale horizontally

Core Concept Analysis

The Scaling Journey

                        SCALING YOUR DATABASE

    Stage 1: Single Server          Stage 2: Read Replicas
    ┌─────────────┐                 ┌─────────────┐
    │   Primary   │                 │   Primary   │◄── Writes
    │   (R + W)   │                 │   (Writes)  │
    └─────────────┘                 └──────┬──────┘
                                           │ Replication
                                    ┌──────┴──────┐
                                    ▼             ▼
                               ┌────────┐   ┌────────┐
                               │Replica │   │Replica │◄── Reads
                               └────────┘   └────────┘

    Stage 3: Sharding               Stage 4: Distributed Database
    ┌─────────┐ ┌─────────┐        ┌─────────────────────────────┐
    │ Shard 1 │ │ Shard 2 │        │   Distributed SQL/NoSQL     │
    │ (A-M)   │ │ (N-Z)   │        │  (Automatic Sharding +      │
    └─────────┘ └─────────┘        │   Replication + Consensus)  │
    ┌─────────┐ ┌─────────┐        └─────────────────────────────┘
    │ Shard 3 │ │ Shard 4 │        CockroachDB, Spanner, TiDB,
    │ (0-4)   │ │ (5-9)   │        Vitess, Citus, YugabyteDB
    └─────────┘ └─────────┘

Fundamental Concepts

1. Partitioning vs Sharding

VERTICAL PARTITIONING              HORIZONTAL PARTITIONING (SHARDING)
(Split by columns)                 (Split by rows)

┌──────────────────────┐           ┌──────────────────────┐
│ id│name│email│bio    │           │ id│name│email│bio    │
├──────────────────────┤           ├──────────────────────┤
│ 1 │John│j@..│Long... │           │ 1 │John│j@..│...     │ ─┐
│ 2 │Jane│ja@.│Long... │           │ 2 │Jane│ja@.│...     │  │ Shard 1
│ 3 │Bob │b@..│Long... │           ├──────────────────────┤ ─┘
└──────────────────────┘           │ 3 │Bob │b@..│...     │ ─┐
         │                         │ 4 │Mary│m@..│...     │  │ Shard 2
         ▼                         └──────────────────────┘ ─┘
┌────────────┐ ┌───────────┐
│id│name│email│ │id│bio     │
├────────────┤ ├───────────┤
│1 │John│j@..│ │1 │Long... │
│2 │Jane│ja@.│ │2 │Long... │
└────────────┘ └───────────┘
 (Hot data)     (Cold data)

2. Sharding Strategies

Hash-Based Sharding:

shard_id = hash(shard_key) % num_shards

User ID: 12345
hash(12345) = 0x7A3F...
0x7A3F % 4 = 2
→ Goes to Shard 2
  • ✅ Even distribution
  • ❌ Range queries require scatter-gather
  • ❌ Resharding requires moving lots of data

Range-Based Sharding:

Shard 1: user_id 1 - 1,000,000
Shard 2: user_id 1,000,001 - 2,000,000
Shard 3: user_id 2,000,001 - 3,000,000
  • ✅ Range queries are efficient
  • ❌ Hot spots if recent data is accessed more
  • ❌ Manual rebalancing needed

Directory-Based Sharding:

┌─────────────────────────────┐
│      Lookup Service         │
│  user_123 → Shard 2         │
│  user_456 → Shard 1         │
│  user_789 → Shard 3         │
└─────────────────────────────┘
  • ✅ Flexible placement
  • ❌ Lookup service is a bottleneck
  • ❌ Additional latency

Consistent Hashing:

                    ┌──────────────┐
               ────►│   Node A     │◄────
              │     └──────────────┘     │
              │            ▲             │
         ┌────┴────┐       │       ┌─────┴───┐
         │ Key X   │   Hash Ring   │ Node C  │
         └─────────┘       │       └─────────┘
              │            ▼             │
              │     ┌──────────────┐     │
               ────►│   Node B     │◄────
                    └──────────────┘

Keys are assigned to the next node clockwise on the ring.
Adding/removing a node only affects neighboring keys.

3. CAP Theorem

                    CONSISTENCY
                         ▲
                        ╱ ╲
                       ╱   ╲
                      ╱     ╲
                     ╱  CA   ╲
                    ╱ (Single ╲
                   ╱   Node)   ╲
                  ╱─────────────╲
                 ╱       │       ╲
                ╱   CP   │   AP   ╲
               ╱         │         ╲
              ╱  (Mongo  │  (Cass-  ╲
             ╱  primary) │  andra)   ╲
            ▼────────────┴────────────▼
     PARTITION                    AVAILABILITY
     TOLERANCE

In a distributed system, you must choose:
- CP: Consistency + Partition Tolerance (sacrifice Availability)
- AP: Availability + Partition Tolerance (sacrifice Consistency)
- CA: Only possible without network partitions (single node)

4. ACID vs BASE

Property ACID (Traditional) BASE (Distributed)
A Atomicity Basically Available
C Consistency Soft state
I Isolation Eventually consistent
D Durability -
Use Case Banks, transactions Social media, analytics
Scaling Vertical Horizontal

5. Replication Strategies

SINGLE-LEADER (Primary-Replica)     MULTI-LEADER
┌─────────┐                         ┌─────────┐   ┌─────────┐
│ Primary │◄── All Writes           │Leader 1 │◄─►│Leader 2 │
└────┬────┘                         └────┬────┘   └────┬────┘
     │ Async/Sync Replication            │             │
┌────┴────┐  ┌─────────┐           ┌────┴────┐   ┌────┴────┐
│Replica 1│  │Replica 2│           │Replica  │   │Replica  │
└─────────┘  └─────────┘           └─────────┘   └─────────┘

LEADERLESS (Dynamo-style)
┌─────────┐   ┌─────────┐   ┌─────────┐
│ Node 1  │◄─►│ Node 2  │◄─►│ Node 3  │
└─────────┘   └─────────┘   └─────────┘
     │             │             │
     └─────────────┴─────────────┘
           Quorum: W + R > N
           (Write to W nodes, Read from R nodes)

6. Distributed Transactions

Two-Phase Commit (2PC):

            Coordinator
                │
    Phase 1: Prepare
    ┌───────────┼───────────┐
    ▼           ▼           ▼
┌───────┐  ┌───────┐  ┌───────┐
│Node 1 │  │Node 2 │  │Node 3 │
│ VOTE  │  │ VOTE  │  │ VOTE  │
│  YES  │  │  YES  │  │  YES  │
└───┬───┘  └───┬───┘  └───┬───┘
    │          │          │
    └──────────┼──────────┘
               │
    Phase 2: Commit (if all YES)
    ┌───────────┼───────────┐
    ▼           ▼           ▼
┌───────┐  ┌───────┐  ┌───────┐
│COMMIT │  │COMMIT │  │COMMIT │
└───────┘  └───────┘  └───────┘

Saga Pattern (for long-running transactions):

T1 ──► T2 ──► T3 ──► T4 ──► Success!
│      │      │      │
▼      ▼      ▼      ▼
C1 ◄── C2 ◄── C3 ◄── C4  (Compensating transactions if failure)

Prerequisites

Before starting these projects, you should have:

  • SQL proficiency - Complex queries, joins, indexes, query plans
  • Basic networking - TCP/IP, HTTP, request/response
  • One programming language well - Python, Go, Java, or Rust recommended
  • Docker basics - For running multiple database instances
  • Basic data structures - Hash tables, trees, linked lists

Helpful but not required:

  • Experience with PostgreSQL or MySQL administration
  • Understanding of B-trees and LSM trees
  • Basic distributed systems concepts

Development Environment Setup

Required Tools

# Docker for running databases
curl -fsSL https://get.docker.com | sh

# Database clients
sudo apt install postgresql-client mysql-client redis-tools

# Docker Compose for multi-container setups
sudo apt install docker-compose

# Programming language (choose one)
# Python
sudo apt install python3 python3-pip
pip3 install psycopg2-binary pymysql redis

# Go
wget https://go.dev/dl/go1.21.0.linux-amd64.tar.gz
sudo tar -C /usr/local -xzf go1.21.0.linux-amd64.tar.gz

# Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Quick Test

# Start a PostgreSQL container
docker run -d --name postgres -e POSTGRES_PASSWORD=secret -p 5432:5432 postgres

# Connect
psql -h localhost -U postgres

Project List

Projects progress from understanding fundamentals to building production-grade distributed systems.


Project 1: Query Router and Connection Pooler

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust, Python, Java
  • Coolness Level: Level 3: Genuinely Clever
  • Business Potential: 3. The “Service & Support” Model
  • Difficulty: Level 2: Intermediate
  • Knowledge Area: Database Proxying / Connection Management
  • Software or Tool: PostgreSQL, TCP proxying
  • Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann

What you’ll build: A database proxy that sits between your application and PostgreSQL, routing queries to appropriate backends, pooling connections, and providing basic load balancing for read replicas.

Why it teaches sharding: Before you shard, you need to understand how applications talk to databases. A proxy is the foundation of all sharding solutions (Vitess, ProxySQL, PgBouncer). You’ll understand the wire protocol, connection lifecycle, and query routing.

Core challenges you’ll face:

  • Parsing the PostgreSQL wire protocol → maps to understanding database communication
  • Managing connection pools → maps to resource management in databases
  • Routing reads vs writes → maps to replica routing strategies
  • Handling failures gracefully → maps to high availability patterns

Key Concepts:

  • PostgreSQL protocol: PostgreSQL documentation “Frontend/Backend Protocol”
  • Connection pooling: PgBouncer documentation and design
  • Read/write splitting: “High Performance MySQL” Chapter 11
  • Health checks: Detecting failed backends

Difficulty: Intermediate Time estimate: 2 weeks Prerequisites: Network programming basics, SQL proficiency

Real world outcome:

$ ./dbproxy --config proxy.yaml &
[INFO] Listening on :5433
[INFO] Primary: localhost:5432 (healthy)
[INFO] Replica 1: localhost:5433 (healthy)
[INFO] Replica 2: localhost:5434 (healthy)
[INFO] Connection pool: 10 min, 100 max per backend

# Application connects to proxy
$ psql -h localhost -p 5433 -U app mydb

# Writes go to primary
mydb=> INSERT INTO users (name) VALUES ('Alice');
[PROXY] Routed INSERT to primary:5432

# Reads go to replicas (round-robin)
mydb=> SELECT * FROM users;
[PROXY] Routed SELECT to replica:5433

mydb=> SELECT * FROM users WHERE id = 1;
[PROXY] Routed SELECT to replica:5434

# Check proxy stats
$ curl localhost:8080/stats
{
  "connections": {
    "active": 5,
    "idle": 45,
    "waiting": 0
  },
  "queries": {
    "total": 12456,
    "reads": 10234,
    "writes": 2222
  },
  "backends": {
    "primary": {"healthy": true, "latency_ms": 2},
    "replica1": {"healthy": true, "latency_ms": 1},
    "replica2": {"healthy": true, "latency_ms": 1}
  }
}

Implementation Hints:

PostgreSQL wire protocol basics:

Startup: Client sends StartupMessage with user, database
Auth:    Server sends AuthenticationOk (or challenge)
Query:   Client sends Query message, Server sends RowDescription + DataRow + CommandComplete
Close:   Client sends Terminate

Message format:

┌───────┬────────────┬─────────────┐
│ Type  │ Length     │ Payload     │
│ 1 byte│ 4 bytes    │ variable    │
└───────┴────────────┴─────────────┘

Query message: 'Q' + length + query_string + '\0'

Pseudo-code for connection pooling:

pool = {
    available: [],  // Idle connections
    in_use: [],     // Active connections
    waiting: [],    // Clients waiting for connection
}

func get_connection():
    if available.len() > 0:
        return available.pop()
    elif in_use.len() < max_connections:
        return create_new_connection()
    else:
        wait_for_available()

func release_connection(conn):
    if waiting.len() > 0:
        waiting.pop().give(conn)
    else:
        available.push(conn)

Read/write detection:

func isWriteQuery(query string) bool {
    query = strings.ToUpper(strings.TrimSpace(query))
    return strings.HasPrefix(query, "INSERT") ||
           strings.HasPrefix(query, "UPDATE") ||
           strings.HasPrefix(query, "DELETE") ||
           strings.HasPrefix(query, "CREATE") ||
           strings.HasPrefix(query, "ALTER") ||
           strings.HasPrefix(query, "DROP")
}

Questions to guide implementation:

  1. How do you handle transactions that mix reads and writes?
  2. What happens when a backend becomes unhealthy mid-query?
  3. How do you detect a “stuck” connection that should be recycled?
  4. How does connection pooling differ from multiplexing?

Learning milestones:

  1. Proxy forwards simple queries → You understand the wire protocol
  2. Connection pooling reduces connection count → You understand resource management
  3. Reads go to replicas → You understand read/write splitting
  4. Failover works → You understand health checking

Project 2: Implement Consistent Hashing

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust, Python, Java
  • Coolness Level: Level 4: Hardcore Tech Flex
  • Business Potential: 1. The “Resume Gold”
  • Difficulty: Level 2: Intermediate
  • Knowledge Area: Distributed Algorithms / Hashing
  • Software or Tool: Hash functions, ring data structure
  • Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann

What you’ll build: A consistent hashing library with virtual nodes that can map keys to nodes, handle node additions/removals with minimal key redistribution, and provide load balancing statistics.

Why it teaches sharding: Consistent hashing is the foundation of how distributed caches (Memcached), databases (DynamoDB, Cassandra), and CDNs distribute data. Understanding it is essential for designing any distributed system.

Core challenges you’ll face:

  • Implementing the hash ring → maps to understanding distributed key placement
  • Adding virtual nodes → maps to solving load imbalance
  • Handling node changes → maps to minimal data movement
  • Measuring distribution quality → maps to load balancing metrics

Key Concepts:

  • Consistent hashing: Original paper by Karger et al. (1997)
  • Virtual nodes: Spreading each physical node across the ring
  • Hash functions: MD5, SHA-1, xxHash for distribution
  • Load factor: Measuring how evenly keys are distributed

Difficulty: Intermediate Time estimate: 1 week Prerequisites: Understanding of hash tables, basic data structures

Real world outcome:

$ ./consistent_hash demo
Consistent Hashing Demo
=======================

Adding nodes: A, B, C (100 virtual nodes each)

Ring visualization (simplified):
  0°────────90°────────180°────────270°────────360°
  │    A    │    B    │    C    │    A    │    B   │
  │  ********************  │

Distributing 10,000 keys...
  Node A: 3,342 keys (33.4%)
  Node B: 3,301 keys (33.0%)
  Node C: 3,357 keys (33.6%)
  Standard deviation: 0.3%

Adding node D...
  Keys moved: 2,456 (24.6%) - only from neighbors!
  Node A: 2,512 keys (25.1%)
  Node B: 2,489 keys (24.9%)
  Node C: 2,543 keys (25.4%)
  Node D: 2,456 keys (24.6%)

Removing node B...
  Keys moved: 2,489 (to A, C, D only)
  Node A: 3,341 keys (33.4%)
  Node C: 3,318 keys (33.2%)
  Node D: 3,341 keys (33.4%)

$ ./consistent_hash lookup --key "user:12345"
Key: user:12345
Hash: 0x7A3F2B1C
Position on ring: 214.7°
Primary node: C
Replica nodes: A, D (next 2 clockwise)

Implementation Hints:

Hash ring data structure:

type HashRing struct {
    ring       []uint64        // Sorted positions on ring
    nodeMap    map[uint64]string // Position -> node name
    nodes      map[string]int  // Node name -> virtual node count
    replicas   int             // Virtual nodes per physical node
}

func (h *HashRing) AddNode(name string) {
    for i := 0; i < h.replicas; i++ {
        key := fmt.Sprintf("%s-%d", name, i)
        hash := hashKey(key)
        h.ring = append(h.ring, hash)
        h.nodeMap[hash] = name
    }
    sort.Slice(h.ring, func(i, j int) bool {
        return h.ring[i] < h.ring[j]
    })
}

func (h *HashRing) GetNode(key string) string {
    hash := hashKey(key)
    // Binary search for first position >= hash
    idx := sort.Search(len(h.ring), func(i int) bool {
        return h.ring[i] >= hash
    })
    if idx == len(h.ring) {
        idx = 0  // Wrap around
    }
    return h.nodeMap[h.ring[idx]]
}

Getting N replicas (for replication):

func (h *HashRing) GetNodes(key string, n int) []string {
    hash := hashKey(key)
    idx := sort.Search(len(h.ring), func(i int) bool {
        return h.ring[i] >= hash
    })

    result := make([]string, 0, n)
    seen := make(map[string]bool)

    for len(result) < n && len(seen) < len(h.nodes) {
        node := h.nodeMap[h.ring[idx % len(h.ring)]]
        if !seen[node] {
            seen[node] = true
            result = append(result, node)
        }
        idx++
    }
    return result
}

Questions to guide implementation:

  1. Why do we use virtual nodes instead of just one position per physical node?
  2. How does the number of virtual nodes affect distribution quality?
  3. What happens if two virtual nodes hash to the same position?
  4. How would you implement “weighted” consistent hashing?

Learning milestones:

  1. Basic ring works → You understand the algorithm
  2. Virtual nodes improve distribution → You understand load balancing
  3. Node changes move minimal keys → You understand the key benefit
  4. You can get N replicas → You understand replication placement

Project 3: Application-Level Sharding (Manual Sharding)

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Python
  • Alternative Programming Languages: Go, Java, Node.js
  • Coolness Level: Level 3: Genuinely Clever
  • Business Potential: 2. The “Micro-SaaS / Pro Tool”
  • Difficulty: Level 2: Intermediate
  • Knowledge Area: Sharding Strategies / Data Modeling
  • Software or Tool: PostgreSQL, application routing
  • Main Book: “High Performance MySQL” by Baron Schwartz

What you’ll build: A Python library that implements application-level sharding for a user database—routing queries to the correct shard based on user_id, handling cross-shard queries, and providing a migration path for resharding.

Why it teaches sharding: Before using tools like Vitess or Citus, many companies implement sharding in their application layer. Understanding manual sharding teaches you the fundamental problems that these tools solve, and helps you make better decisions about when and how to shard.

Core challenges you’ll face:

  • Choosing and implementing a shard key → maps to data distribution decisions
  • Routing queries to correct shard → maps to query routing logic
  • Handling cross-shard queries → maps to the scatter-gather pattern
  • Resharding without downtime → maps to online data migration

Key Concepts:

  • Shard key selection: “Designing Data-Intensive Applications” Chapter 6
  • Cross-shard queries: Fan-out and aggregation
  • Resharding: Double-write and backfill strategies
  • Shard-local vs global data: What to shard and what not to

Difficulty: Intermediate Time estimate: 2 weeks Prerequisites: Project 2 completed, SQL proficiency, Python

Real world outcome:

# Initialize sharded database
from shardlib import ShardedDB

db = ShardedDB(
    shards=[
        {"host": "shard1.db.local", "range": (0, 1000000)},
        {"host": "shard2.db.local", "range": (1000001, 2000000)},
        {"host": "shard3.db.local", "range": (2000001, 3000000)},
    ],
    shard_key="user_id"
)

# Simple query - routed to correct shard
user = db.query(
    "SELECT * FROM users WHERE user_id = %s",
    [12345]
)
# [LOG] Routed to shard1 (user_id 12345 in range 0-1000000)

# Insert - routed by shard key
db.execute(
    "INSERT INTO orders (user_id, product, amount) VALUES (%s, %s, %s)",
    [12345, "Widget", 99.99]
)
# [LOG] Routed to shard1

# Cross-shard query - scatter-gather
top_users = db.query_all_shards(
    "SELECT user_id, SUM(amount) as total FROM orders GROUP BY user_id ORDER BY total DESC LIMIT 10"
)
# [LOG] Scatter to 3 shards, gather and merge results
# Returns combined top 10 across all shards

# Resharding (add a new shard)
db.add_shard({"host": "shard4.db.local", "range": (3000001, 4000000)})
db.reshard(strategy="double-write")
# [LOG] Enabling double-write to old and new shards
# [LOG] Backfilling data from shard3 to shard4
# [LOG] Verifying consistency
# [LOG] Switching reads to new shard
# [LOG] Disabling double-write

$ python -m shardlib stats
Shard Statistics
================
Shard 1 (shard1.db.local):
  Users: 342,156
  Orders: 1,234,567
  Size: 12.3 GB
  QPS: 1,234

Shard 2 (shard2.db.local):
  Users: 338,901
  Orders: 1,198,234
  Size: 11.8 GB
  QPS: 1,156

Shard 3 (shard3.db.local):
  Users: 345,678
  Orders: 1,267,890
  Size: 12.7 GB
  QPS: 1,289

Distribution quality: Good (max deviation: 2.1%)

Implementation Hints:

Shard routing:

class ShardRouter:
    def __init__(self, shards, shard_key):
        self.shards = shards
        self.shard_key = shard_key

    def get_shard(self, shard_key_value):
        for shard in self.shards:
            if shard['range'][0] <= shard_key_value <= shard['range'][1]:
                return shard
        raise ValueError(f"No shard for key {shard_key_value}")

    def extract_shard_key(self, query, params):
        # Parse query to find WHERE user_id = ?
        # Return the shard key value from params
        pass

Cross-shard query (scatter-gather):

def query_all_shards(self, query, merge_func=None):
    results = []
    for shard in self.shards:
        conn = self.get_connection(shard)
        shard_results = conn.execute(query).fetchall()
        results.extend(shard_results)

    if merge_func:
        return merge_func(results)
    return results

# For aggregations like SUM, COUNT
def merge_sum(results):
    return sum(r[0] for r in results)

# For ORDER BY ... LIMIT
def merge_top_n(results, n, key_func):
    return sorted(results, key=key_func, reverse=True)[:n]

Double-write resharding:

def reshard_double_write(self, old_shard, new_shard):
    # Phase 1: Enable double-write
    self.double_write_enabled = True
    self.double_write_targets = [old_shard, new_shard]

    # Phase 2: Backfill existing data
    cursor = old_shard.execute("SELECT * FROM users WHERE user_id > %s",
                                [new_shard['range'][0]])
    for row in cursor:
        new_shard.execute("INSERT INTO users ...", row)

    # Phase 3: Verify consistency
    # Compare checksums or row counts

    # Phase 4: Switch reads
    self.update_routing(new_shard)

    # Phase 5: Disable double-write
    self.double_write_enabled = False

Questions to guide implementation:

  1. What happens if a query doesn’t include the shard key?
  2. How do you handle JOINs between tables with different shard keys?
  3. What’s the difference between logical and physical shards?
  4. How do you maintain referential integrity across shards?

Learning milestones:

  1. Simple queries route correctly → You understand shard routing
  2. Cross-shard queries work → You understand scatter-gather
  3. Resharding works without data loss → You understand online migration
  4. Your library handles edge cases → You understand production concerns

Project 4: Build a Key-Value Store with Replication

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust, Java
  • Coolness Level: Level 4: Hardcore Tech Flex
  • Business Potential: 2. The “Micro-SaaS / Pro Tool”
  • Difficulty: Level 3: Advanced
  • Knowledge Area: Distributed Storage / Replication
  • Software or Tool: Custom storage engine, replication protocol
  • Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann

What you’ll build: A distributed key-value store with primary-replica replication, supporting both synchronous and asynchronous replication modes, with automatic failover.

Why it teaches sharding: Replication is the other half of distributed databases (sharding + replication). Understanding how data is replicated, how to handle failures, and the consistency tradeoffs prepares you for any distributed database work.

Core challenges you’ll face:

  • Implementing the replication log → maps to write-ahead logging
  • Synchronous vs asynchronous replication → maps to consistency vs latency tradeoffs
  • Handling replica lag → maps to eventual consistency
  • Implementing failover → maps to high availability

Key Concepts:

  • Replication log: “Designing Data-Intensive Applications” Chapter 5
  • Leader election: Raft or simple heartbeat-based
  • Consistency levels: Strong, eventual, read-your-writes
  • Failover strategies: Automatic vs manual promotion

Difficulty: Advanced Time estimate: 3 weeks Prerequisites: Projects 1-2 completed, understanding of networking

Real world outcome:

$ ./kvstore --mode primary --port 6000 --replicas "localhost:6001,localhost:6002"
[INFO] Starting as PRIMARY on :6000
[INFO] Replica localhost:6001 connected
[INFO] Replica localhost:6002 connected
[INFO] Replication mode: async (configurable per-write)

# On another terminal
$ ./kvstore --mode replica --port 6001 --primary localhost:6000
[INFO] Starting as REPLICA on :6001
[INFO] Connected to primary localhost:6000
[INFO] Replication lag: 0 ops

# Client operations
$ ./kvcli -h localhost:6000
kv> SET user:123 '{"name": "Alice", "email": "alice@example.com"}'
OK (replicated to 2/2 replicas)

kv> GET user:123
{"name": "Alice", "email": "alice@example.com"}

kv> SET important:data '{"critical": true}' --sync
OK (synchronously replicated to 2/2 replicas)

# Check replication status
kv> STATUS
Primary: localhost:6000
  Keys: 1,234
  Replication log position: 5678

Replicas:
  localhost:6001: lag=0 ops, healthy
  localhost:6002: lag=2 ops, healthy

# Simulate primary failure
$ kill -9 $(pgrep -f "kvstore.*primary")

# Replica detects failure and one becomes primary
[REPLICA 6001] Primary heartbeat timeout
[REPLICA 6001] Starting election...
[REPLICA 6001] Elected as new PRIMARY
[REPLICA 6002] Acknowledged new primary: localhost:6001

$ ./kvcli -h localhost:6001
kv> GET user:123
{"name": "Alice", "email": "alice@example.com"}

kv> SET user:456 '{"name": "Bob"}'
OK (replicated to 1/1 replicas)

Implementation Hints:

Replication log structure:

type LogEntry struct {
    Sequence  uint64
    Timestamp time.Time
    Operation string    // SET, DELETE
    Key       string
    Value     []byte
}

type ReplicationLog struct {
    entries []LogEntry
    lastSeq uint64
    mu      sync.RWMutex
}

func (r *ReplicationLog) Append(op, key string, value []byte) uint64 {
    r.mu.Lock()
    defer r.mu.Unlock()

    r.lastSeq++
    entry := LogEntry{
        Sequence:  r.lastSeq,
        Timestamp: time.Now(),
        Operation: op,
        Key:       key,
        Value:     value,
    }
    r.entries = append(r.entries, entry)
    return r.lastSeq
}

func (r *ReplicationLog) GetEntriesSince(seq uint64) []LogEntry {
    r.mu.RLock()
    defer r.mu.RUnlock()

    // Find entries after seq
    for i, e := range r.entries {
        if e.Sequence > seq {
            return r.entries[i:]
        }
    }
    return nil
}

Replica synchronization:

func (replica *Replica) syncLoop() {
    for {
        entries := replica.primary.GetEntriesSince(replica.lastSeq)
        for _, entry := range entries {
            replica.apply(entry)
            replica.lastSeq = entry.Sequence
        }
        time.Sleep(10 * time.Millisecond)  // Or use streaming
    }
}

Synchronous replication:

func (primary *Primary) SetSync(key string, value []byte) error {
    // Write locally
    seq := primary.log.Append("SET", key, value)
    primary.store.Set(key, value)

    // Wait for all replicas
    var wg sync.WaitGroup
    errors := make(chan error, len(primary.replicas))

    for _, replica := range primary.replicas {
        wg.Add(1)
        go func(r *ReplicaConn) {
            defer wg.Done()
            if err := r.WaitForSeq(seq, 5*time.Second); err != nil {
                errors <- err
            }
        }(replica)
    }

    wg.Wait()
    close(errors)

    for err := range errors {
        if err != nil {
            return err  // Or handle partial success
        }
    }
    return nil
}

Questions to guide implementation:

  1. What happens if a replica is down during a synchronous write?
  2. How do you handle a “split-brain” scenario?
  3. What’s the tradeoff between replication lag and throughput?
  4. How do you handle a replica that’s too far behind to catch up?

Learning milestones:

  1. Async replication works → You understand log-based replication
  2. Sync replication blocks until confirmed → You understand consistency guarantees
  3. Failover promotes a replica → You understand leader election
  4. Clients reconnect to new primary → You understand high availability

Project 5: Implement Distributed Transactions (Two-Phase Commit)

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust, Java
  • Coolness Level: Level 5: Pure Magic
  • Business Potential: 3. The “Service & Support” Model
  • Difficulty: Level 4: Expert
  • Knowledge Area: Distributed Transactions / Consensus
  • Software or Tool: Custom transaction coordinator
  • Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann

What you’ll build: A transaction coordinator that implements Two-Phase Commit (2PC) across multiple database shards, handling prepare/commit/abort phases, timeouts, and recovery.

Why it teaches sharding: Cross-shard transactions are one of the hardest problems in distributed databases. Understanding 2PC teaches you why distributed transactions are expensive, what can go wrong, and why many systems avoid them.

Core challenges you’ll face:

  • Implementing the 2PC protocol → maps to atomic commit across nodes
  • Handling coordinator failure → maps to the coordinator as single point of failure
  • Implementing participant recovery → maps to crash recovery
  • Dealing with blocking → maps to why 2PC is problematic

Key Concepts:

  • Two-Phase Commit: “Designing Data-Intensive Applications” Chapter 9
  • Prepare/Commit/Abort phases: The protocol states
  • Transaction log: For crash recovery
  • Heuristic decisions: What participants do when coordinator dies

Difficulty: Expert Time estimate: 3 weeks Prerequisites: Projects 3-4 completed

Real world outcome:

$ ./txcoordinator --shards "shard1:5432,shard2:5432,shard3:5432"
[INFO] Transaction Coordinator started
[INFO] Connected to 3 shards

# Client initiates a distributed transaction
$ ./txclient
tx> BEGIN
Transaction ID: tx-12345

tx> UPDATE shard1.accounts SET balance = balance - 100 WHERE user_id = 1
[tx-12345] Prepared on shard1

tx> UPDATE shard2.accounts SET balance = balance + 100 WHERE user_id = 2
[tx-12345] Prepared on shard2

tx> COMMIT
[tx-12345] Phase 1: PREPARE
  shard1: VOTE YES
  shard2: VOTE YES
  shard3: VOTE YES (no-op, not involved)
[tx-12345] Phase 2: COMMIT
  shard1: COMMITTED
  shard2: COMMITTED
[tx-12345] Transaction committed successfully

# View transaction log
$ ./txcoordinator --show-log
Transaction Log
===============
tx-12345: COMMITTED
  Participants: shard1, shard2
  Prepare time: 2024-01-15 10:30:00.123
  Commit time:  2024-01-15 10:30:00.145
  Duration: 22ms

# Simulate failure during commit
tx> BEGIN
Transaction ID: tx-12346

tx> UPDATE shard1.accounts SET balance = balance - 50 WHERE user_id = 1
tx> UPDATE shard2.accounts SET balance = balance + 50 WHERE user_id = 2
tx> COMMIT
[tx-12346] Phase 1: PREPARE
  shard1: VOTE YES
  shard2: VOTE NO (constraint violation)
[tx-12346] ABORTING (participant voted NO)
  shard1: ROLLBACK
  shard2: ROLLBACK
[tx-12346] Transaction aborted

# Recovery after coordinator crash
$ ./txcoordinator --recover
[INFO] Recovering from transaction log...
[INFO] Found 1 in-doubt transaction: tx-12347
[INFO] tx-12347 was in PREPARED state, querying participants...
  shard1: PREPARED (waiting)
  shard2: PREPARED (waiting)
[INFO] All participants prepared, completing COMMIT
[INFO] Recovery complete

Implementation Hints:

Transaction states:

type TxState int
const (
    TxStarted TxState = iota
    TxPreparing
    TxPrepared
    TxCommitting
    TxCommitted
    TxAborting
    TxAborted
)

type Transaction struct {
    ID           string
    State        TxState
    Participants []string
    Operations   []Operation
    PrepareTime  time.Time
    CommitTime   time.Time
}

Coordinator 2PC logic:

func (c *Coordinator) Commit(tx *Transaction) error {
    // Phase 1: Prepare
    tx.State = TxPreparing
    c.log.Write(tx)  // Persist before sending

    votes := make(map[string]bool)
    for _, p := range tx.Participants {
        vote, err := c.sendPrepare(p, tx.ID)
        if err != nil || !vote {
            return c.abort(tx)
        }
        votes[p] = vote
    }

    // All voted YES
    tx.State = TxPrepared
    c.log.Write(tx)

    // Phase 2: Commit
    tx.State = TxCommitting
    c.log.Write(tx)

    for _, p := range tx.Participants {
        c.sendCommit(p, tx.ID)  // Retry until success
    }

    tx.State = TxCommitted
    tx.CommitTime = time.Now()
    c.log.Write(tx)

    return nil
}

Participant logic:

func (p *Participant) Prepare(txID string, ops []Operation) bool {
    // Acquire locks
    for _, op := range ops {
        if !p.lockManager.TryLock(op.Key, txID) {
            return false  // Vote NO
        }
    }

    // Write to WAL
    p.wal.Write(txID, "PREPARED", ops)

    // Execute but don't commit
    for _, op := range ops {
        p.executeWithoutCommit(op)
    }

    return true  // Vote YES
}

func (p *Participant) Commit(txID string) {
    p.wal.Write(txID, "COMMITTED")
    p.commitPending(txID)
    p.lockManager.ReleaseLocks(txID)
}

Questions to guide implementation:

  1. What happens if the coordinator crashes after sending PREPARE but before receiving all votes?
  2. What happens if a participant crashes after voting YES but before receiving COMMIT?
  3. Why is 2PC called a “blocking” protocol?
  4. How does 3PC (Three-Phase Commit) attempt to solve the blocking problem?

Learning milestones:

  1. Happy path works → You understand the basic protocol
  2. Participant failure causes abort → You understand voting
  3. Coordinator recovery works → You understand crash recovery
  4. You understand the limitations → You know when NOT to use 2PC

Project 6: Build a Sharded Database with Automatic Rebalancing

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust, Java
  • Coolness Level: Level 5: Pure Magic
  • Business Potential: 4. The “Open Core” Infrastructure
  • Difficulty: Level 4: Expert
  • Knowledge Area: Sharding / Data Migration
  • Software or Tool: Custom sharded database
  • Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann

What you’ll build: A sharded key-value database that automatically detects hot shards and rebalances data, including live migration of data between shards without downtime.

Why it teaches sharding: Automatic rebalancing is what separates toy sharding implementations from production systems. DynamoDB, CockroachDB, and TiDB all do this. Understanding live data migration teaches you about the hardest operational challenges in distributed databases.

Core challenges you’ll face:

  • Detecting hot shards → maps to monitoring and metrics
  • Splitting shards → maps to range-based partitioning
  • Live data migration → maps to online schema changes
  • Maintaining consistency during migration → maps to double-write patterns

Key Concepts:

  • Shard splitting: Dividing a range in half
  • Live migration: Moving data while serving traffic
  • Consistent hashing rebalancing: Adding virtual nodes
  • Backpressure: Slowing migration to not impact production

Difficulty: Expert Time estimate: 4 weeks Prerequisites: Projects 2-5 completed

Real world outcome:

$ ./sharddb --init --shards 3
[INFO] Initializing sharded database with 3 shards
[INFO] Shard 1: range [0x0000, 0x5555)
[INFO] Shard 2: range [0x5555, 0xAAAA)
[INFO] Shard 3: range [0xAAAA, 0xFFFF)

# Load data
$ ./sharddb-load --keys 1000000
[INFO] Loading 1,000,000 keys...
[INFO] Distribution: Shard1=333,412 Shard2=332,876 Shard3=333,712

# Simulate hot shard (lots of traffic to one range)
$ ./sharddb-bench --hotspot --range "0x0000-0x2000" --qps 10000
[INFO] Generating hotspot traffic...

# Automatic detection and rebalancing
$ ./sharddb-admin status
Shard Status
============
Shard 1: [0x0000, 0x5555)
  Keys: 333,412
  QPS: 8,500 ⚠️ HOT
  CPU: 85%
  Rebalance: TRIGGERED

Shard 2: [0x5555, 0xAAAA)
  Keys: 332,876
  QPS: 750
  CPU: 15%

Shard 3: [0xAAAA, 0xFFFF)
  Keys: 333,712
  QPS: 750
  CPU: 15%

[INFO] Auto-rebalancing: Splitting Shard 1 at 0x2AAA
[INFO] Creating Shard 4 for range [0x2AAA, 0x5555)
[INFO] Migration started: 166,000 keys to move

Migration Progress
==================
Phase 1: Double-write enabled
  New writes go to both Shard 1 and Shard 4
  ████████████████████████░░░░░░ 80%

Phase 2: Backfill
  Copying existing keys...
  ████████████████░░░░░░░░░░░░░░ 55%
  Keys moved: 91,300 / 166,000
  Rate: 5,000 keys/sec (throttled to reduce impact)

Phase 3: Verification
  Comparing checksums...

Phase 4: Cutover
  Updating routing table...
  Clients now read from Shard 4 for [0x2AAA, 0x5555)

Phase 5: Cleanup
  Removing migrated keys from Shard 1...

[INFO] Rebalance complete!

$ ./sharddb-admin status
Shard 1: [0x0000, 0x2AAA)  Keys: 167,412  QPS: 4,250  CPU: 45%
Shard 2: [0x5555, 0xAAAA)  Keys: 332,876  QPS: 750    CPU: 15%
Shard 3: [0xAAAA, 0xFFFF)  Keys: 333,712  QPS: 750    CPU: 15%
Shard 4: [0x2AAA, 0x5555)  Keys: 166,000  QPS: 4,250  CPU: 45%

Implementation Hints:

Shard metadata:

type Shard struct {
    ID        string
    RangeStart uint64
    RangeEnd   uint64
    Node       string
    State      ShardState  // ACTIVE, SPLITTING, MIGRATING
}

type ShardManager struct {
    shards      []*Shard
    routing     *ConsistentHash  // Or range-based lookup
    migrations  []*Migration
}

Hot shard detection:

type ShardMetrics struct {
    QPS      float64
    P99Latency time.Duration
    CPUUsage   float64
    KeyCount   int64
}

func (sm *ShardManager) detectHotShards() []*Shard {
    var hot []*Shard
    avgQPS := sm.averageQPS()

    for _, shard := range sm.shards {
        metrics := sm.getMetrics(shard.ID)
        if metrics.QPS > avgQPS * 2.0 || metrics.CPUUsage > 0.8 {
            hot = append(hot, shard)
        }
    }
    return hot
}

Live migration:

func (sm *ShardManager) splitShard(shard *Shard) error {
    midpoint := (shard.RangeStart + shard.RangeEnd) / 2

    // Create new shard
    newShard := &Shard{
        ID:         uuid.New().String(),
        RangeStart: midpoint,
        RangeEnd:   shard.RangeEnd,
        State:      MIGRATING,
    }

    // Phase 1: Enable double-write
    sm.enableDoubleWrite(shard, newShard)

    // Phase 2: Backfill existing data
    cursor := shard.Scan(midpoint, shard.RangeEnd)
    for cursor.Next() {
        key, value := cursor.KeyValue()
        newShard.Put(key, value)

        // Throttle to limit impact
        time.Sleep(time.Microsecond * 100)
    }

    // Phase 3: Verify
    if !sm.verify(shard, newShard, midpoint) {
        return errors.New("verification failed")
    }

    // Phase 4: Atomic cutover
    sm.updateRouting(func(r *Routing) {
        shard.RangeEnd = midpoint
        r.AddShard(newShard)
    })

    // Phase 5: Cleanup
    shard.DeleteRange(midpoint, shard.RangeEnd)
    newShard.State = ACTIVE
    sm.disableDoubleWrite(shard, newShard)

    return nil
}

Questions to guide implementation:

  1. How do you handle writes to keys being migrated?
  2. What if the migration takes longer than expected?
  3. How do you handle a node failure during migration?
  4. When should you merge shards instead of splitting them?

Learning milestones:

  1. Hot shard detection works → You understand monitoring
  2. Manual split works → You understand the migration protocol
  3. Automatic rebalancing triggers → You understand policy-based operations
  4. Live migration doesn’t drop requests → You’ve built a production-grade feature

Project 7: Implement Raft Consensus for Leader Election

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust
  • Coolness Level: Level 5: Pure Magic
  • Business Potential: 4. The “Open Core” Infrastructure
  • Difficulty: Level 5: Master
  • Knowledge Area: Distributed Consensus / Raft
  • Software or Tool: Raft implementation
  • Main Book: “In Search of an Understandable Consensus Algorithm” (Raft paper)

What you’ll build: A Raft consensus implementation that can be used for leader election and replicated state machines, the foundation of distributed databases like CockroachDB and etcd.

Why it teaches sharding: Modern distributed databases use Raft (or Paxos) for replication within each shard. Understanding Raft means understanding how databases achieve strong consistency in a distributed environment.

Core challenges you’ll face:

  • Implementing leader election → maps to distributed coordination
  • Log replication → maps to consistent replication
  • Handling network partitions → maps to split-brain prevention
  • Log compaction (snapshots) → maps to memory management

Key Concepts:

  • Raft paper: “In Search of an Understandable Consensus Algorithm” by Ongaro & Ousterhout
  • Terms and elections: How leaders are chosen
  • Log matching: Ensuring all nodes have the same log
  • Safety guarantees: What Raft guarantees and what it doesn’t

Difficulty: Master Time estimate: 4-6 weeks Prerequisites: Projects 4-5 completed, deep understanding of distributed systems

Real world outcome:

$ ./raft-cluster --nodes 5
[Node1] Starting as FOLLOWER, term=0
[Node2] Starting as FOLLOWER, term=0
[Node3] Starting as FOLLOWER, term=0
[Node4] Starting as FOLLOWER, term=0
[Node5] Starting as FOLLOWER, term=0

# Election timeout on Node3
[Node3] Election timeout, becoming CANDIDATE, term=1
[Node3] Requesting votes...
[Node1] Granting vote to Node3 for term=1
[Node2] Granting vote to Node3 for term=1
[Node4] Granting vote to Node3 for term=1
[Node3] Received 4/5 votes, becoming LEADER for term=1

# Client submits a command
$ ./raft-client --leader node3:8000
raft> SET x = 10
[Node3] Appending to log: index=1, term=1, cmd="SET x = 10"
[Node3] Replicating to followers...
[Node1] Appended entry, log length=1
[Node2] Appended entry, log length=1
[Node4] Appended entry, log length=1
[Node3] Entry committed (3/5 replicas), applying to state machine
OK

raft> GET x
10

# Simulate leader failure
$ kill -9 $(pgrep -f "raft.*node3")

[Node1] Leader heartbeat timeout
[Node5] Leader heartbeat timeout
[Node2] Election timeout, becoming CANDIDATE, term=2
[Node1] Granting vote to Node2 for term=2
[Node4] Granting vote to Node2 for term=2
[Node5] Granting vote to Node2 for term=2
[Node2] Received 4/4 votes, becoming LEADER for term=2

# New leader has all committed entries
$ ./raft-client --leader node2:8000
raft> GET x
10

raft> SET y = 20
OK

# View cluster status
raft> STATUS
Cluster Status
==============
Leader: Node2 (term=2)

Node    State      Log Length  Commit Index
----    -----      ----------  ------------
Node1   Follower   2           2
Node2   Leader     2           2
Node3   (down)
Node4   Follower   2           2
Node5   Follower   2           2

Implementation Hints:

Node state:

type NodeState int
const (
    Follower NodeState = iota
    Candidate
    Leader
)

type RaftNode struct {
    // Persistent state
    currentTerm int
    votedFor    string
    log         []LogEntry

    // Volatile state
    commitIndex int
    lastApplied int

    // Leader state
    nextIndex   map[string]int
    matchIndex  map[string]int

    state       NodeState
    leaderId    string
}

Leader election:

func (n *RaftNode) startElection() {
    n.currentTerm++
    n.state = Candidate
    n.votedFor = n.id
    votes := 1  // Vote for self

    for _, peer := range n.peers {
        go func(p string) {
            reply := n.sendRequestVote(p, &RequestVoteArgs{
                Term:         n.currentTerm,
                CandidateId:  n.id,
                LastLogIndex: len(n.log) - 1,
                LastLogTerm:  n.log[len(n.log)-1].Term,
            })

            if reply.VoteGranted {
                votes++
                if votes > len(n.peers)/2 {
                    n.becomeLeader()
                }
            }
        }(peer)
    }
}

Log replication:

func (n *RaftNode) appendEntries(peer string) {
    prevLogIndex := n.nextIndex[peer] - 1
    entries := n.log[n.nextIndex[peer]:]

    reply := n.sendAppendEntries(peer, &AppendEntriesArgs{
        Term:         n.currentTerm,
        LeaderId:     n.id,
        PrevLogIndex: prevLogIndex,
        PrevLogTerm:  n.log[prevLogIndex].Term,
        Entries:      entries,
        LeaderCommit: n.commitIndex,
    })

    if reply.Success {
        n.matchIndex[peer] = prevLogIndex + len(entries)
        n.nextIndex[peer] = n.matchIndex[peer] + 1
        n.updateCommitIndex()
    } else {
        n.nextIndex[peer]--  // Backtrack and retry
    }
}

Questions to guide implementation:

  1. Why does Raft use randomized election timeouts?
  2. What’s the “log matching property” and why is it important?
  3. How does Raft handle a “split vote”?
  4. Why can’t a leader commit entries from previous terms directly?

Learning milestones:

  1. Leader election works → You understand the election protocol
  2. Log replication works → You understand the AppendEntries RPC
  3. System survives leader failure → You understand fault tolerance
  4. You pass the Raft paper’s tests → You have a correct implementation

Project 8: Build a Distributed SQL Query Engine

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust, Java
  • Coolness Level: Level 5: Pure Magic
  • Business Potential: 4. The “Open Core” Infrastructure
  • Difficulty: Level 5: Master
  • Knowledge Area: Query Processing / Distributed SQL
  • Software or Tool: SQL parser, query optimizer
  • Main Book: “Database Internals” by Alex Petrov

What you’ll build: A distributed SQL query engine that can parse SQL, create distributed query plans, and execute queries across multiple shards with proper aggregation.

Why it teaches sharding: The hardest part of sharding isn’t data distribution—it’s making it transparent to applications. Distributed SQL engines like CockroachDB, Vitess, and Citus solve this. Understanding query distribution teaches you why these systems are valuable and how they work.

Core challenges you’ll face:

  • Parsing and planning SQL → maps to query processing fundamentals
  • Distributing queries to shards → maps to query routing
  • Handling JOINs across shards → maps to distributed joins
  • Aggregating results → maps to parallel aggregation

Key Concepts:

  • Query planning: “Database Internals” Chapters on query processing
  • Distributed joins: Hash join, broadcast join, collocated join
  • Push-down optimization: Execute as much as possible on shards
  • Result aggregation: Combining partial results correctly

Difficulty: Master Time estimate: 6+ weeks Prerequisites: All previous projects, SQL internals knowledge

Real world outcome:

$ ./distsql --shards "shard1:5432,shard2:5432,shard3:5432"
[INFO] Distributed SQL Engine started
[INFO] Connected to 3 shards
[INFO] Schema synchronized

distsql> EXPLAIN SELECT * FROM users WHERE user_id = 12345;
Query Plan
==========
→ SingleShardQuery
    Shard: shard1 (user_id 12345 routes here)
    Query: SELECT * FROM users WHERE user_id = 12345

distsql> EXPLAIN SELECT COUNT(*), AVG(age) FROM users;
Query Plan
==========
→ Aggregate (final)
    Functions: SUM(partial_count), SUM(partial_sum)/SUM(partial_count)
    ├── ScatterGather
    │   ├── Shard1: SELECT COUNT(*) as partial_count, SUM(age) as partial_sum, COUNT(age) as partial_count_age FROM users
    │   ├── Shard2: SELECT COUNT(*) as partial_count, SUM(age) as partial_sum, COUNT(age) as partial_count_age FROM users
    │   └── Shard3: SELECT COUNT(*) as partial_count, SUM(age) as partial_sum, COUNT(age) as partial_count_age FROM users

distsql> EXPLAIN SELECT u.name, COUNT(o.id) FROM users u JOIN orders o ON u.user_id = o.user_id GROUP BY u.user_id;
Query Plan
==========
→ Aggregate (final)
    Group by: user_id
    ├── ScatterGather (collocated join - both tables sharded by user_id)
    │   ├── Shard1: SELECT u.name, COUNT(o.id) FROM users u JOIN orders o ON u.user_id = o.user_id WHERE u.user_id BETWEEN 0 AND 1000000 GROUP BY u.user_id
    │   ├── Shard2: ...
    │   └── Shard3: ...

distsql> SELECT u.name, COUNT(o.id) as order_count
         FROM users u
         JOIN orders o ON u.user_id = o.user_id
         GROUP BY u.user_id
         ORDER BY order_count DESC
         LIMIT 10;

[Executing on 3 shards...]
[Shard1] 2,345 rows scanned, 450 groups
[Shard2] 2,412 rows scanned, 467 groups
[Shard3] 2,301 rows scanned, 443 groups
[Merging results...]

name          | order_count
--------------+-------------
John Smith    | 156
Jane Doe      | 142
Bob Wilson    | 138
...

10 rows (45ms, 3 shards queried)

Implementation Hints:

Query plan representation:

type PlanNode interface {
    Execute() ([]Row, error)
    Children() []PlanNode
}

type ScatterGather struct {
    shards    []Shard
    subquery  string
    children  []PlanNode
}

type Aggregate struct {
    functions []AggregateFunc
    groupBy   []string
    child     PlanNode
}

type SingleShard struct {
    shard    Shard
    query    string
}

Query routing based on shard key:

func (p *Planner) planQuery(query *ParsedQuery) PlanNode {
    // Check if query has shard key in WHERE clause
    shardKey := p.extractShardKey(query.Where)

    if shardKey != nil {
        // Route to single shard
        shard := p.router.GetShard(shardKey)
        return &SingleShard{shard: shard, query: query.String()}
    }

    // Need to scatter to all shards
    return p.planScatterGather(query)
}

Distributed aggregation:

// For AVG, we need to compute SUM and COUNT on each shard,
// then combine: total_sum / total_count

func (p *Planner) rewriteForDistributedAgg(agg *Aggregate) *Aggregate {
    var partialFuncs []AggregateFunc
    var finalFuncs []AggregateFunc

    for _, f := range agg.functions {
        switch f.Name {
        case "COUNT":
            partialFuncs = append(partialFuncs, Count(f.Arg))
            finalFuncs = append(finalFuncs, Sum("partial_count"))
        case "SUM":
            partialFuncs = append(partialFuncs, Sum(f.Arg))
            finalFuncs = append(finalFuncs, Sum("partial_sum"))
        case "AVG":
            // Rewrite AVG(x) to SUM(x)/COUNT(x)
            partialFuncs = append(partialFuncs, Sum(f.Arg), Count(f.Arg))
            finalFuncs = append(finalFuncs, Divide(Sum("partial_sum"), Sum("partial_count")))
        }
    }
    // ... create two-phase aggregate plan
}

Questions to guide implementation:

  1. How do you handle ORDER BY with LIMIT across shards?
  2. What’s the difference between a collocated join and a broadcast join?
  3. How do you handle subqueries that cross shards?
  4. When is it better to pull data to a coordinator vs. push computation to shards?

Learning milestones:

  1. Simple queries route correctly → You understand query routing
  2. Aggregations work across shards → You understand distributed aggregation
  3. Collocated joins work → You understand join optimization
  4. Query plans are efficient → You understand push-down optimization

Project 9: Multi-Region Database with Conflict Resolution

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust
  • Coolness Level: Level 5: Pure Magic
  • Business Potential: 5. The “Industry Disruptor”
  • Difficulty: Level 5: Master
  • Knowledge Area: Multi-Region / Conflict Resolution
  • Software or Tool: CRDTs, vector clocks
  • Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann

What you’ll build: A multi-region database that allows writes in multiple regions simultaneously, using vector clocks for causality tracking and CRDTs for conflict-free data types.

Why it teaches sharding: Global applications need to write data close to users for low latency. But this creates conflicts. Understanding multi-region replication and conflict resolution teaches you how systems like DynamoDB and CockroachDB handle global scale.

Core challenges you’ll face:

  • Implementing vector clocks → maps to causality tracking
  • Detecting and resolving conflicts → maps to concurrent writes
  • Implementing CRDTs → maps to conflict-free data types
  • Cross-region replication → maps to WAN replication

Key Concepts:

  • Vector clocks: “Designing Data-Intensive Applications” Chapter 5
  • CRDTs: Conflict-free Replicated Data Types
  • Last-writer-wins: Simple but lossy conflict resolution
  • Causal consistency: Preserving happened-before relationships

Difficulty: Master Time estimate: 4-6 weeks Prerequisites: Projects 4-7 completed

Real world outcome:

# Start multi-region cluster
$ ./multiregion --region us-west --peers "eu-west:8000,ap-east:8000"
[us-west] Multi-region database started
[us-west] Connected to peers: eu-west, ap-east

# Concurrent writes in different regions
# Region: us-west
uswest> SET user:123:name "Alice"
OK (version: {us-west: 1})

# Simultaneously in eu-west
euwest> SET user:123:name "Alicia"
OK (version: {eu-west: 1})

# After replication...
uswest> GET user:123:name
CONFLICT DETECTED!
  Version {us-west: 1}: "Alice"
  Version {eu-west: 1}: "Alicia"

Resolution strategy: last-writer-wins
  Winner: "Alicia" (eu-west timestamp later)

# Using CRDTs for conflict-free updates
uswest> COUNTER user:123:visits INCR 5
OK (version: {us-west: 1}, value: 5)

euwest> COUNTER user:123:visits INCR 3
OK (version: {eu-west: 1}, value: 3)

# After replication - no conflict!
uswest> COUNTER user:123:visits GET
8  (merged: 5 + 3)

# Using G-Set (grow-only set)
uswest> GSET user:123:tags ADD "premium"
OK

euwest> GSET user:123:tags ADD "verified"
OK

uswest> GSET user:123:tags MEMBERS
["premium", "verified"]  (union of both additions)

# View causality
uswest> HISTORY user:123:name
Version History (vector clock)
==============================
{us-west: 0, eu-west: 0}(initial)
{us-west: 1, eu-west: 0}"Alice" (us-west)
{us-west: 0, eu-west: 1}"Alicia" (eu-west) [concurrent!]
{us-west: 1, eu-west: 1}"Alicia" (resolved)

Implementation Hints:

Vector clock:

type VectorClock map[string]uint64

func (vc VectorClock) Increment(nodeId string) {
    vc[nodeId]++
}

func (vc VectorClock) Merge(other VectorClock) VectorClock {
    result := make(VectorClock)
    for k, v := range vc {
        result[k] = v
    }
    for k, v := range other {
        if v > result[k] {
            result[k] = v
        }
    }
    return result
}

func (vc VectorClock) Compare(other VectorClock) Ordering {
    // Returns: Before, After, Equal, or Concurrent
}

G-Counter CRDT:

type GCounter struct {
    counts map[string]uint64  // node -> count
}

func (g *GCounter) Increment(nodeId string, amount uint64) {
    g.counts[nodeId] += amount
}

func (g *GCounter) Value() uint64 {
    var total uint64
    for _, v := range g.counts {
        total += v
    }
    return total
}

func (g *GCounter) Merge(other *GCounter) {
    for node, count := range other.counts {
        if count > g.counts[node] {
            g.counts[node] = count
        }
    }
}

Last-writer-wins register:

type LWWRegister struct {
    value     interface{}
    timestamp time.Time
    nodeId    string
}

func (r *LWWRegister) Set(value interface{}, ts time.Time, node string) {
    if ts.After(r.timestamp) || (ts.Equal(r.timestamp) && node > r.nodeId) {
        r.value = value
        r.timestamp = ts
        r.nodeId = node
    }
}

func (r *LWWRegister) Merge(other *LWWRegister) {
    r.Set(other.value, other.timestamp, other.nodeId)
}

Questions to guide implementation:

  1. What’s the difference between vector clocks and Lamport timestamps?
  2. When is last-writer-wins acceptable, and when is it dangerous?
  3. How do CRDTs achieve eventual consistency without coordination?
  4. What are the limitations of CRDTs?

Learning milestones:

  1. Vector clocks track causality → You understand happens-before
  2. Conflicts are detected → You understand concurrent writes
  3. CRDTs merge correctly → You understand conflict-free types
  4. Multi-region replication works → You’ve built a globally distributed system

Project 10: Production-Ready Sharded Database (Capstone)

  • File: LEARN_DATABASE_SHARDING_DEEP_DIVE.md
  • Main Programming Language: Go
  • Alternative Programming Languages: Rust
  • Coolness Level: Level 5: Pure Magic
  • Business Potential: 5. The “Industry Disruptor”
  • Difficulty: Level 5: Master
  • Knowledge Area: Complete Distributed Database
  • Software or Tool: All previous components
  • Main Book: All previous books combined

What you’ll build: A complete, production-ready sharded database combining all previous projects: sharding with consistent hashing, replication with Raft, distributed transactions, automatic rebalancing, and a SQL query engine.

Why this is the capstone: This project integrates everything you’ve learned. It’s what companies like CockroachDB, TiDB, and YugabyteDB have built. Completing this means you truly understand distributed databases at a deep level.

Core challenges you’ll face:

  • Integration complexity → maps to system design
  • Operational features → maps to production readiness
  • Performance optimization → maps to real-world constraints
  • Testing distributed systems → maps to correctness verification

Prerequisites: All previous projects completed

Real world outcome:

$ ./proddb init --nodes 9 --replication-factor 3
[INFO] Initializing production database cluster
[INFO] 9 nodes, 3-way replication per shard
[INFO] Initial shards: 9 (3 Raft groups of 3 nodes each)

Cluster Topology
================
Shard 1 (range: 0x0000-0x5555)
  ├── Node1 (leader, us-east-1a)
  ├── Node2 (follower, us-east-1b)
  └── Node3 (follower, us-east-1c)

Shard 2 (range: 0x5555-0xAAAA)
  ├── Node4 (leader, us-west-2a)
  ├── Node5 (follower, us-west-2b)
  └── Node6 (follower, us-west-2c)

Shard 3 (range: 0xAAAA-0xFFFF)
  ├── Node7 (leader, eu-west-1a)
  ├── Node8 (follower, eu-west-1b)
  └── Node9 (follower, eu-west-1c)

$ ./proddb sql
proddb> CREATE TABLE users (
          id BIGINT PRIMARY KEY,
          name TEXT,
          email TEXT,
          created_at TIMESTAMP
        ) SHARD BY HASH(id);

proddb> CREATE TABLE orders (
          id BIGINT PRIMARY KEY,
          user_id BIGINT REFERENCES users(id),
          amount DECIMAL,
          status TEXT
        ) SHARD BY HASH(user_id);  -- Collocated with users

proddb> INSERT INTO users VALUES (1, 'Alice', 'alice@example.com', NOW());
[Routed to Shard 2, committed via Raft]

proddb> BEGIN;
proddb> UPDATE users SET name = 'Alicia' WHERE id = 1;
proddb> INSERT INTO orders VALUES (100, 1, 99.99, 'pending');
proddb> COMMIT;
[Distributed transaction across Shard 2 (both tables collocated)]
[2PC: Prepare OK, Commit OK]

proddb> SELECT u.name, SUM(o.amount)
        FROM users u JOIN orders o ON u.id = o.user_id
        GROUP BY u.id;
[Distributed query, collocated join, 3 shards queried]
[Results merged at coordinator]

# Admin commands
$ ./proddb admin rebalance --check
Shard Balance Analysis
======================
Shard 1: 2.1M keys, 45% CPU
Shard 2: 2.3M keys, 52% CPU  ← slightly hot
Shard 3: 1.9M keys, 38% CPU

Recommendation: Split Shard 2

$ ./proddb admin rebalance --execute
[Starting automatic rebalancing...]
[Live migration in progress...]
[Rebalancing complete, 4 shards now]

$ ./proddb admin failover --simulate --node Node1
[Simulating Node1 failure...]
[Shard 1: Node2 elected as new leader (Raft term 5)]
[Cluster healthy, no data loss]

$ ./proddb benchmark --duration 60s --threads 100
Benchmark Results (60s, 100 threads)
====================================
Operations:     1,234,567
Throughput:     20,576 ops/sec
Latency P50:    2.3ms
Latency P99:    15.7ms
Latency P999:   45.2ms
Errors:         0

This capstone integrates:

  1. Consistent hashing (Project 2) for shard placement
  2. Query routing (Project 1) for connection management
  3. Application sharding (Project 3) for data distribution
  4. Replication (Project 4) for durability
  5. Distributed transactions (Project 5) for ACID
  6. Auto-rebalancing (Project 6) for operations
  7. Raft consensus (Project 7) for leader election
  8. Distributed SQL (Project 8) for query execution

Project Comparison Table

Project Difficulty Time Depth of Understanding Fun Factor
1. Query Router Intermediate 2 weeks ⭐⭐⭐ ⭐⭐⭐
2. Consistent Hashing Intermediate 1 week ⭐⭐⭐⭐ ⭐⭐⭐
3. Application Sharding Intermediate 2 weeks ⭐⭐⭐⭐ ⭐⭐⭐⭐
4. KV Store with Replication Advanced 3 weeks ⭐⭐⭐⭐ ⭐⭐⭐⭐
5. Two-Phase Commit Expert 3 weeks ⭐⭐⭐⭐⭐ ⭐⭐⭐
6. Auto-Rebalancing Expert 4 weeks ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐
7. Raft Consensus Master 4-6 weeks ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
8. Distributed SQL Engine Master 6+ weeks ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
9. Multi-Region + CRDTs Master 4-6 weeks ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
10. Capstone Master 8+ weeks ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐

Phase 1: Foundations (4-6 weeks)

  1. Project 1: Query Router - Understand database proxying
  2. Project 2: Consistent Hashing - Understand distributed key placement
  3. Project 3: Application Sharding - Implement manual sharding

Phase 2: Replication & Transactions (6-8 weeks)

  1. Project 4: Replicated KV Store - Understand replication
  2. Project 5: Two-Phase Commit - Understand distributed transactions

Phase 3: Advanced Operations (6-8 weeks)

  1. Project 6: Auto-Rebalancing - Understand live migration
  2. Project 7: Raft Consensus - Understand distributed consensus

Phase 4: Query Processing (6+ weeks)

  1. Project 8: Distributed SQL Engine - Understand query distribution

Phase 5: Global Scale (4-6 weeks)

  1. Project 9: Multi-Region with CRDTs - Understand global distribution

Phase 6: Integration (8+ weeks)

  1. Project 10: Production Database - Combine everything

Total estimated time: 9-12 months of focused learning


Essential Resources

Books (In Priority Order)

  1. “Designing Data-Intensive Applications” by Martin Kleppmann - THE book for this topic
  2. “Database Internals” by Alex Petrov - Deep dive into storage engines
  3. “Understanding Distributed Systems” by Roberto Vitillo - Practical introduction
  4. “High Performance MySQL” - Practical MySQL scaling

Papers

  • Raft: “In Search of an Understandable Consensus Algorithm”
  • Dynamo: “Dynamo: Amazon’s Highly Available Key-value Store”
  • Spanner: “Spanner: Google’s Globally Distributed Database”
  • CockroachDB: “CockroachDB: The Resilient Geo-Distributed SQL Database”

Online Resources

Tools to Study

  • Vitess - MySQL sharding
  • CockroachDB - Distributed SQL
  • TiDB - Distributed MySQL-compatible
  • etcd - Raft-based KV store

Summary

# Project Main Language
1 Query Router and Connection Pooler Go
2 Consistent Hashing Implementation Go
3 Application-Level Sharding Library Python
4 Key-Value Store with Replication Go
5 Two-Phase Commit Coordinator Go
6 Auto-Rebalancing Sharded Database Go
7 Raft Consensus Implementation Go
8 Distributed SQL Query Engine Go
9 Multi-Region Database with CRDTs Go
10 Production-Ready Sharded Database (Capstone) Go

Final Notes

Database sharding and distributed databases represent some of the hardest problems in computer science. The challenges include:

  • Correctness: Distributed systems can fail in subtle ways
  • Performance: Network latency dominates everything
  • Operations: Running distributed systems is complex
  • Tradeoffs: There’s no perfect solution, only tradeoffs

The journey from understanding CAP theorem to building a production database is transformative. You’ll understand why:

  • Companies pay millions for CockroachDB and Spanner
  • “Just add more shards” is never simple
  • Distributed transactions are expensive
  • Eventual consistency is often the right choice

Start with Project 1 and the book “Designing Data-Intensive Applications.” By the time you finish, you’ll understand distributed databases at a level few engineers achieve.

Happy distributing! 🗄️


Sources consulted: