Project 6: Lock-Free MPMC Queue

Quick Reference

Attribute Details
Difficulty Master
Time Estimate 3-4 weeks
Primary Language C++
Alternative Languages C, Rust
Knowledge Area Advanced Lock-Free / High-Performance Systems
Tools Required g++/clang++, ThreadSanitizer, perf, cachegrind
Primary Reference “C++ Concurrency in Action, Second Edition” by Anthony Williams, Chapter 7.3
Prerequisites Project 5 (Lock-Free Stack) completed. Deep understanding of memory ordering. Familiarity with cache architecture.

Learning Objectives

By completing this project, you will be able to:

  1. Design and implement a bounded lock-free MPMC queue using ring buffers and sequence numbers
  2. Coordinate multiple producers and consumers without locks using atomic compare-and-swap operations
  3. Apply the sequence number trick to safely hand off slots between producers and consumers
  4. Eliminate false sharing through cache-line alignment and padding
  5. Achieve high throughput (8M+ items/second) through contention minimization
  6. Verify lock-free guarantees and understand the difference from wait-free
  7. Benchmark and compare your implementation against mutex-based and industry alternatives
  8. Debug subtle concurrency bugs using memory sanitizers and formal reasoning

Theoretical Foundation

Core Concepts

What is an MPMC Queue?

An MPMC (Multi-Producer Multi-Consumer) queue allows:

  • Multiple threads to enqueue items concurrently (producers)
  • Multiple threads to dequeue items concurrently (consumers)
  • No mutual exclusion between operations (lock-free)

This is the “final boss” of lock-free data structures because it requires coordinating four types of concurrent operations:

  1. Producer vs Producer (who claims which slot?)
  2. Consumer vs Consumer (who claims which item?)
  3. Producer vs Consumer at same slot (is data ready?)
  4. Wraparound (when buffer is full/empty)
Traditional Mutex-Based Queue:

     ┌─────────────────────────────────────────┐
     │                                         │
  P1─┼──┐                                      │
  P2─┼──┼──► [LOCK] ──► QUEUE ──► [LOCK] ──►──┼──C1
  P3─┼──┘                                      ├──C2
     │    All operations serialize on lock     │
     └─────────────────────────────────────────┘

Lock-Free MPMC Queue:

     ┌─────────────────────────────────────────┐
     │                                         │
  P1─┼──────────────────┐                      │
  P2─┼──────────────────┼──► RING BUFFER ──►──┼──C1
  P3─┼──────────────────┘     (per-slot       ├──C2
     │                         sequencing)     │
     │    Operations proceed in parallel       │
     └─────────────────────────────────────────┘

Sequence Numbers: The Key Insight

The fundamental trick that makes lock-free MPMC queues work is the sequence number per slot. Each slot in the ring buffer has an atomic sequence number that acts as a “turn indicator”:

Sequence Number State Machine:

                    ┌─────────────────────────────┐
                    │                             │
                    ▼                             │
    ┌───────────────────────────────┐            │
    │ sequence == position          │ ───────────┘
    │ "Slot ready for producer"     │
    └───────────────┬───────────────┘
                    │
                    │ Producer stores data,
                    │ sets sequence = position + 1
                    │
                    ▼
    ┌───────────────────────────────┐
    │ sequence == position + 1      │
    │ "Slot ready for consumer"     │
    └───────────────┬───────────────┘
                    │
                    │ Consumer loads data,
                    │ sets sequence = position + buffer_size
                    │
                    ▼
    ┌───────────────────────────────┐
    │ sequence == next_position     │ ──► Next cycle begins
    │ "Slot ready for next producer"│
    └───────────────────────────────┘

For a buffer of size N and position P:

  • sequence == P: Slot is empty and ready for producer claiming position P
  • sequence == P + 1: Slot contains data for consumer claiming position P
  • sequence == P + N: Slot is ready for producer claiming position P + N (next cycle)

Bounded Ring Buffer Structure

The queue uses a ring buffer with power-of-2 size for fast modular arithmetic:

Ring Buffer Layout (size = 8):

Index:    0     1     2     3     4     5     6     7
        ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
Slot:   │ S0  │ S1  │ S2  │ S3  │ S4  │ S5  │ S6  │ S7  │
        └──▲──┴─────┴──▲──┴─────┴─────┴─────┴──▲──┴─────┘
           │           │                       │
      dequeue_pos   enqueue_pos            wrap point
         = 8          = 10                 (pos & 7)
         (8 & 7 = 0)  (10 & 7 = 2)

Position → Index mapping: index = position & (size - 1)
                         index = position & mask

Why power of 2?
  position % size  = SLOW (division)
  position & mask  = FAST (single AND instruction)

Cache-Line Alignment

Modern CPUs transfer memory in 64-byte cache lines. If two variables share a cache line, writing one invalidates the other in all cores’ caches (false sharing):

False Sharing Problem:

┌─────────────────────────────────────────────────────────────────┐
│ Cache Line 0 (64 bytes)                                         │
│ ┌────────────────────┬────────────────────┬──────────────────┐ │
│ │ enqueue_pos (8B)   │ dequeue_pos (8B)   │ padding (48B)    │ │
│ └────────┬───────────┴─────────┬──────────┴──────────────────┘ │
│          │                     │                                │
│          │ Producer writes     │ Consumer writes                │
│          │ here constantly     │ here constantly                │
│          │                     │                                │
│          └──────────┬──────────┘                                │
│                     │                                           │
│              CACHE LINE PING-PONG!                              │
│              Every write invalidates the other                  │
└─────────────────────────────────────────────────────────────────┘

Solution: Separate Cache Lines

┌─────────────────────────────────────────────────────────────────┐
│ Cache Line 0 (64 bytes)                                         │
│ ┌────────────────────┬──────────────────────────────────────┐  │
│ │ enqueue_pos (8B)   │ padding (56B)                        │  │
│ └────────────────────┴──────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Cache Line 1 (64 bytes)                                         │
│ ┌────────────────────┬──────────────────────────────────────┐  │
│ │ dequeue_pos (8B)   │ padding (56B)                        │  │
│ └────────────────────┴──────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

Now producers and consumers don't interfere!

Why This Matters

MPMC queues are the communication backbone of every major concurrent system:

System Use Case Why MPMC?
Tokio (Rust) Task queue Distribute async tasks to worker threads
Go Scheduler Run queue Balance goroutines across OS threads
TBB (Intel) Work stealing Share tasks between thread pools
LMAX Disruptor Event processing Ultra-low-latency financial systems
Game Engines Job systems Parallel physics, rendering, AI

Performance impact:

Throughput Comparison (8 threads, 64-slot queue):

std::queue + mutex:     1.2M items/second  ████
boost::lockfree::queue: 7.5M items/second  ██████████████████████████████████
Well-tuned MPMC:        8.9M items/second  ████████████████████████████████████████

                        0    2    4    6    8    10 (millions/second)

The difference between 1.2M and 8.9M is a 7.4x speedup. In systems processing billions of messages daily, this translates to massive infrastructure cost savings.

Historical Context

The modern bounded MPMC queue design evolved through decades of research:

1990s: Lock-Free Foundations

  • Maurice Herlihy proves universality of compare-and-swap (CAS)
  • Michael & Scott develop classic lock-free queue (unbounded, single-word CAS)

2000s: Practical Implementations

  • Intel TBB concurrent_queue brings lock-free to mainstream
  • LMAX Disruptor shows cache-aware design can achieve sub-microsecond latency

2010s: Bounded MPMC Refinement

  • Facebook Folly MPMCQueue achieves predictable memory usage
  • Dmitry Vyukov’s MPMC designs influence Rust crossbeam and Go channels
  • rigtorp/MPMCQueue demonstrates minimal, high-performance C++ implementation

2020s: Hardware-Aware Optimization

  • ARM memory model considerations (weaker than x86)
  • NUMA-aware queue designs for multi-socket systems
  • Seqlock-inspired techniques for read-heavy workloads

Common Misconceptions

Misconception 1: “Lock-free means faster”

Reality: Lock-free guarantees progress (no deadlock), not speed. A well-tuned mutex can outperform a poorly designed lock-free structure.

Lock-free wins when:

  • High contention (many threads fighting for access)
  • Short critical sections (mutex overhead dominates)
  • Real-time requirements (bounded worst-case latency)

Lock-free loses when:

  • Low contention (mutex barely ever waits)
  • Complex operations (CAS retry loops waste cycles)
  • Single producer/consumer (simpler designs exist)

Misconception 2: “Just use atomic and you're done"

Reality: Making individual operations atomic doesn’t make the algorithm correct. The MPMC queue requires carefully ordered sequences of operations:

// WRONG: Each operation is atomic, but together they're broken
void broken_enqueue(T value) {
    size_t pos = enqueue_pos.fetch_add(1);           // 1. Claim position
    buffer[pos % size] = value;                       // 2. Store data
    // No synchronization! Consumer might see garbage!
}

// RIGHT: Sequence number coordinates handoff
void correct_enqueue(T value) {
    size_t pos = enqueue_pos.fetch_add(1);           // 1. Claim position
    Slot& slot = buffer[pos & mask];
    while (slot.sequence.load(acquire) != pos) { }   // 2. Wait for slot
    slot.data = value;                                // 3. Store data
    slot.sequence.store(pos + 1, release);           // 4. Signal consumer
}

Misconception 3: “ABA problem is only for pointers”

Reality: The ABA problem can occur with any value that cycles. In MPMC queues, we avoid it by:

  • Using monotonically increasing positions (never reused)
  • Sequence numbers that encode state, not identity

Misconception 4: “x86 doesn’t need memory ordering”

Reality: x86 has a strong memory model (TSO) that prevents many reorderings, BUT:

  • Compiler reorderings still happen
  • Atomic operations need correct memory order for correctness
  • Code must work on ARM (phones, Apple Silicon, AWS Graviton)
// This "works" on x86 by accident, but breaks on ARM:
sequence.store(pos + 1, relaxed);  // Consumer might not see data!

// Correct on all platforms:
sequence.store(pos + 1, release);  // Data writes happen-before this

Project Specification

What You’re Building

A production-quality bounded lock-free MPMC queue library with:

  1. Core queue implementation with configurable capacity
  2. Thread-safe enqueue and dequeue operations
  3. Try variants that don’t block
  4. Comprehensive benchmarks comparing against alternatives
  5. Stress tests for correctness verification

Functional Requirements

# Build and run benchmarks
$ mkdir build && cd build
$ cmake .. && make
$ ./mpmc_benchmark

# Run stress tests
$ ./mpmc_stress --threads 16 --duration 60

# Run correctness tests
$ ./mpmc_tests

Expected Output

$ ./mpmc_queue_benchmark

Queue Configuration:
  Capacity: 65536 slots
  Slot size: 64 bytes (cache-line aligned)
  Producers: 4
  Consumers: 4

Throughput test (10 seconds):
  Total items transferred: 89,000,000
  Throughput: 8.9M items/second
  Avg latency: 450ns (producer enqueue to consumer dequeue)
  P99 latency: 1.2us

Comparison:
  std::queue + std::mutex: 1.2M items/second
  Boost.Lockfree MPMC: 7.5M items/second
  Our implementation: 8.9M items/second

Stress test (vary producer/consumer ratios):
  1P/4C: 3.2M items/sec (consumer-bound)
  4P/1C: 3.1M items/sec (producer-bound)
  4P/4C: 8.9M items/sec (balanced)
  8P/8C: 12.1M items/sec (scales with cores)

Correctness tests:
  All items delivered exactly once: PASS
  FIFO order per-producer: PASS
  No deadlocks after 1 hour: PASS

Non-Functional Requirements

Requirement Target
Throughput > 8M items/second (4P/4C)
Latency (avg) < 500ns enqueue-to-dequeue
Latency (P99) < 2us
Memory Fixed allocation (no dynamic alloc in steady state)
Correctness Zero data loss, no duplicates
Progress Lock-free (starvation possible, deadlock impossible)

Solution Architecture

High-Level Design

┌──────────────────────────────────────────────────────────────────────────┐
│                           MPMCQueue<T, Size>                             │
├──────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                    Cache Line 0 (64 bytes)                        │   │
│  │  ┌─────────────────────────┬────────────────────────────────────┐│   │
│  │  │ enqueue_pos: atomic<u64>│           padding                  ││   │
│  │  │ (producers claim here)  │                                    ││   │
│  │  └─────────────────────────┴────────────────────────────────────┘│   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                    Cache Line 1 (64 bytes)                        │   │
│  │  ┌─────────────────────────┬────────────────────────────────────┐│   │
│  │  │ dequeue_pos: atomic<u64>│           padding                  ││   │
│  │  │ (consumers claim here)  │                                    ││   │
│  │  └─────────────────────────┴────────────────────────────────────┘│   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                                                                          │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                    Ring Buffer (Size slots)                       │   │
│  │                                                                   │   │
│  │   Slot 0      Slot 1      Slot 2      ...      Slot N-1          │   │
│  │  ┌────────┐  ┌────────┐  ┌────────┐          ┌────────┐         │   │
│  │  │seq: 0  │  │seq: 1  │  │seq: 2  │    ...   │seq: N-1│         │   │
│  │  │data: T │  │data: T │  │data: T │          │data: T │         │   │
│  │  │padding │  │padding │  │padding │          │padding │         │   │
│  │  └────────┘  └────────┘  └────────┘          └────────┘         │   │
│  │     64B         64B         64B                 64B              │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                                                                          │
├──────────────────────────────────────────────────────────────────────────┤
│  Operations:                                                             │
│  - enqueue(T) : void         (blocking push)                            │
│  - try_enqueue(T) : bool     (non-blocking push)                        │
│  - dequeue() : T             (blocking pop)                             │
│  - try_dequeue(T&) : bool    (non-blocking pop)                         │
│  - size_approx() : size_t    (approximate count)                        │
└──────────────────────────────────────────────────────────────────────────┘

Key Components

1. Slot Structure

Each slot in the ring buffer contains:

struct alignas(64) Slot {  // Aligned to cache line
    std::atomic<size_t> sequence;  // Turn indicator
    T data;                         // Actual payload
    // Implicit padding to 64 bytes
};

Why cache-line alignment?

  • Prevents false sharing between adjacent slots
  • Different threads can work on different slots without cache invalidation
  • Critical for scaling to many cores

2. Position Counters

Two atomic counters track the next position for producers and consumers:

alignas(64) std::atomic<size_t> enqueue_pos{0};  // Own cache line
alignas(64) std::atomic<size_t> dequeue_pos{0};  // Own cache line

These counters monotonically increase, wrapping around via the mask:

  • actual_index = position & (buffer_size - 1)

3. Sequence Number Protocol

The sequence number in each slot coordinates the producer-consumer handoff:

Initial state (empty queue of size 4):
Position:  enqueue_pos = 0, dequeue_pos = 0

Slot[0].sequence = 0   (ready for producer at pos 0)
Slot[1].sequence = 1   (ready for producer at pos 1)
Slot[2].sequence = 2   (ready for producer at pos 2)
Slot[3].sequence = 3   (ready for producer at pos 3)

After producer claims position 0 and stores data:
Slot[0].sequence = 1   (ready for consumer at pos 0)

After consumer claims position 0 and loads data:
Slot[0].sequence = 4   (ready for producer at pos 4 = next cycle)

Data Structures

#include <atomic>
#include <vector>
#include <cstdint>
#include <new>  // For hardware_destructive_interference_size

// Cache line size (typically 64 bytes)
#ifdef __cpp_lib_hardware_interference_size
    constexpr size_t CACHE_LINE = std::hardware_destructive_interference_size;
#else
    constexpr size_t CACHE_LINE = 64;
#endif

template<typename T, size_t Capacity>
class MPMCQueue {
    static_assert((Capacity & (Capacity - 1)) == 0,
                  "Capacity must be power of 2");
    static_assert(Capacity >= 2, "Capacity must be at least 2");

private:
    struct alignas(CACHE_LINE) Slot {
        std::atomic<size_t> sequence;
        T data;

        // Padding is implicit due to alignas
    };

    static constexpr size_t mask_ = Capacity - 1;

    // Each on its own cache line to prevent false sharing
    alignas(CACHE_LINE) std::atomic<size_t> enqueue_pos_{0};
    alignas(CACHE_LINE) std::atomic<size_t> dequeue_pos_{0};

    // The ring buffer
    alignas(CACHE_LINE) Slot buffer_[Capacity];

public:
    MPMCQueue();

    void enqueue(const T& value);
    void enqueue(T&& value);
    bool try_enqueue(const T& value);
    bool try_enqueue(T&& value);

    T dequeue();
    bool try_dequeue(T& value);

    size_t size_approx() const;
    bool empty() const;
    static constexpr size_t capacity() { return Capacity; }
};

Producer Algorithm (Enqueue)

enqueue(value):
    1. pos = enqueue_pos.fetch_add(1, relaxed)     // Claim a position
    2. slot = buffer[pos & mask]                    // Find slot
    3. while slot.sequence.load(acquire) != pos:   // Wait until slot ready
           pause/yield                              // Spin-wait hint
    4. slot.data = move(value)                      // Store data
    5. slot.sequence.store(pos + 1, release)        // Signal consumer
Sequence == pos means:
  "This slot is empty and the producer at position 'pos' can use it"

Setting sequence = pos + 1 means:
  "Data is now stored; consumer at position 'pos' can take it"

Consumer Algorithm (Dequeue)

dequeue():
    1. pos = dequeue_pos.fetch_add(1, relaxed)          // Claim a position
    2. slot = buffer[pos & mask]                         // Find slot
    3. while slot.sequence.load(acquire) != pos + 1:    // Wait for data
           pause/yield                                   // Spin-wait hint
    4. value = move(slot.data)                           // Load data
    5. slot.sequence.store(pos + mask + 1, release)      // Ready for next producer
    6. return value
Sequence == pos + 1 means:
  "This slot has data for consumer at position 'pos'"

Setting sequence = pos + mask + 1 means:
  "Slot is now empty; producer at position 'pos + Capacity' can use it"
  (This is the next cycle for this slot index)

Memory Ordering Analysis

Operation Memory Order Why
fetch_add on position relaxed Just need atomicity; ordering comes from sequence
sequence.load in spin-wait acquire Need to see data stored before sequence update
sequence.store after data release Make data visible before signaling

The acquire-release pairs create synchronization:

  • Producer’s store(release) synchronizes-with Consumer’s load(acquire)
  • This guarantees the consumer sees the data the producer wrote

Implementation Guide

Phase 1: Core Data Structures (Days 1-3)

Goal: Set up the slot structure and ring buffer with correct alignment.

Steps:

  1. Define the cache line constant:
    #ifdef __cpp_lib_hardware_interference_size
     constexpr size_t CACHE_LINE = std::hardware_destructive_interference_size;
    #else
     constexpr size_t CACHE_LINE = 64;
    #endif
    
  2. Create the Slot structure with proper alignment: ```cpp template struct alignas(CACHE_LINE) Slot { std::atomic sequence; T data; // Compiler adds padding to reach CACHE_LINE size };

// Verify size at compile time static_assert(sizeof(Slot) == CACHE_LINE, "Slot must be cache-line sized");


3. Initialize the buffer with sequential sequence numbers:
```cpp
template<typename T, size_t Capacity>
MPMCQueue<T, Capacity>::MPMCQueue() {
    for (size_t i = 0; i < Capacity; ++i) {
        buffer_[i].sequence.store(i, std::memory_order_relaxed);
    }
}

Validation:

  • sizeof(Slot<T>) should equal 64 (or your cache line size)
  • Check alignment with alignof(Slot<T>)

Phase 2: Basic Enqueue/Dequeue (Days 4-7)

Goal: Implement blocking enqueue and dequeue.

Steps:

  1. Implement enqueue:
    template<typename T, size_t Capacity>
    void MPMCQueue<T, Capacity>::enqueue(const T& value) {
     // 1. Claim position
     size_t pos = enqueue_pos_.fetch_add(1, std::memory_order_relaxed);
    
     // 2. Get slot
     Slot& slot = buffer_[pos & mask_];
    
     // 3. Wait for slot to be ready
     size_t expected_seq = pos;
     while (slot.sequence.load(std::memory_order_acquire) != expected_seq) {
         // Spin-wait hint for CPU
         #if defined(__x86_64__) || defined(_M_X64)
             _mm_pause();  // x86 pause instruction
         #elif defined(__aarch64__)
             asm volatile("yield");  // ARM yield
         #endif
     }
    
     // 4. Store data
     slot.data = value;
    
     // 5. Signal consumer
     slot.sequence.store(pos + 1, std::memory_order_release);
    }
    
  2. Implement dequeue:
    template<typename T, size_t Capacity>
    T MPMCQueue<T, Capacity>::dequeue() {
     // 1. Claim position
     size_t pos = dequeue_pos_.fetch_add(1, std::memory_order_relaxed);
    
     // 2. Get slot
     Slot& slot = buffer_[pos & mask_];
    
     // 3. Wait for data
     size_t expected_seq = pos + 1;
     while (slot.sequence.load(std::memory_order_acquire) != expected_seq) {
         #if defined(__x86_64__) || defined(_M_X64)
             _mm_pause();
         #elif defined(__aarch64__)
             asm volatile("yield");
         #endif
     }
    
     // 4. Load data
     T value = std::move(slot.data);
    
     // 5. Signal next producer (for next cycle)
     slot.sequence.store(pos + mask_ + 1, std::memory_order_release);
    
     return value;
    }
    

Validation:

  • Single-threaded test: enqueue N items, dequeue N items, verify order
  • Two-thread test: one producer, one consumer

Phase 3: Non-Blocking Variants (Days 8-10)

Goal: Implement try_enqueue and try_dequeue that return immediately.

Steps:

  1. Implement try_enqueue:
    template<typename T, size_t Capacity>
    bool MPMCQueue<T, Capacity>::try_enqueue(const T& value) {
     size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
    
     while (true) {
         Slot& slot = buffer_[pos & mask_];
         size_t seq = slot.sequence.load(std::memory_order_acquire);
    
         intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
    
         if (diff == 0) {
             // Slot is ready, try to claim position
             if (enqueue_pos_.compare_exchange_weak(
                     pos, pos + 1, std::memory_order_relaxed)) {
                 // Claimed! Store data
                 slot.data = value;
                 slot.sequence.store(pos + 1, std::memory_order_release);
                 return true;
             }
             // CAS failed, loop will retry with updated pos
         } else if (diff < 0) {
             // Queue is full (producer caught up to consumer)
             return false;
         } else {
             // Another producer beat us, reload and retry
             pos = enqueue_pos_.load(std::memory_order_relaxed);
         }
     }
    }
    
  2. Implement try_dequeue:
    template<typename T, size_t Capacity>
    bool MPMCQueue<T, Capacity>::try_dequeue(T& value) {
     size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
    
     while (true) {
         Slot& slot = buffer_[pos & mask_];
         size_t seq = slot.sequence.load(std::memory_order_acquire);
    
         intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
    
         if (diff == 0) {
             // Data is ready, try to claim position
             if (dequeue_pos_.compare_exchange_weak(
                     pos, pos + 1, std::memory_order_relaxed)) {
                 // Claimed! Load data
                 value = std::move(slot.data);
                 slot.sequence.store(pos + mask_ + 1, std::memory_order_release);
                 return true;
             }
             // CAS failed, loop will retry with updated pos
         } else if (diff < 0) {
             // Queue is empty (consumer caught up to producer)
             return false;
         } else {
             // Another consumer beat us, reload and retry
             pos = dequeue_pos_.load(std::memory_order_relaxed);
         }
     }
    }
    

Key insight: The try variants use compare_exchange_weak instead of fetch_add. This allows:

  • Detecting full/empty conditions before claiming a slot
  • Backing off gracefully without advancing the position counter

Validation:

  • Test try_enqueue on full queue returns false
  • Test try_dequeue on empty queue returns false

Phase 4: Optimization (Days 11-14)

Goal: Tune for maximum throughput.

Steps:

  1. Backoff strategy: Replace tight spinning with progressive backoff:
    struct Backoff {
     unsigned spin_count = 0;
     static constexpr unsigned SPIN_LIMIT = 1000;
     static constexpr unsigned YIELD_LIMIT = 10;
    
     void wait() {
         if (spin_count < SPIN_LIMIT) {
             // CPU spin with pause hint
             for (unsigned i = 0; i < (1u << (spin_count / 100)); ++i) {
                 #if defined(__x86_64__)
                     _mm_pause();
                 #elif defined(__aarch64__)
                     asm volatile("yield");
                 #endif
             }
             ++spin_count;
         } else if (spin_count < SPIN_LIMIT + YIELD_LIMIT) {
             std::this_thread::yield();
             ++spin_count;
         } else {
             std::this_thread::sleep_for(std::chrono::microseconds(1));
         }
     }
    
     void reset() { spin_count = 0; }
    };
    
  2. Prefetching: Hint the CPU about upcoming accesses:
    void enqueue(const T& value) {
     size_t pos = enqueue_pos_.fetch_add(1, std::memory_order_relaxed);
     Slot& slot = buffer_[pos & mask_];
    
     // Prefetch next slot for next enqueue
     __builtin_prefetch(&buffer_[(pos + 1) & mask_], 1, 3);
    
     // ... rest of enqueue
    }
    
  3. Verify no false sharing: Use perf to measure cache performance:
    perf stat -e cache-misses,cache-references ./mpmc_benchmark
    

Phase 5: Benchmarking (Days 15-18)

Goal: Measure and compare performance.

Benchmark harness:

#include <chrono>
#include <thread>
#include <vector>

template<typename Queue>
void benchmark(const char* name, Queue& q,
               int num_producers, int num_consumers,
               int items_per_producer) {

    std::atomic<size_t> produced{0};
    std::atomic<size_t> consumed{0};
    std::atomic<bool> done{false};

    auto start = std::chrono::high_resolution_clock::now();

    // Launch producers
    std::vector<std::thread> producers;
    for (int p = 0; p < num_producers; ++p) {
        producers.emplace_back([&, p] {
            for (int i = 0; i < items_per_producer; ++i) {
                q.enqueue(p * items_per_producer + i);
                produced.fetch_add(1, std::memory_order_relaxed);
            }
        });
    }

    // Launch consumers
    std::vector<std::thread> consumers;
    for (int c = 0; c < num_consumers; ++c) {
        consumers.emplace_back([&] {
            while (!done.load(std::memory_order_relaxed)) {
                int value;
                if (q.try_dequeue(value)) {
                    consumed.fetch_add(1, std::memory_order_relaxed);
                }
            }
            // Drain remaining
            int value;
            while (q.try_dequeue(value)) {
                consumed.fetch_add(1, std::memory_order_relaxed);
            }
        });
    }

    // Wait for producers
    for (auto& t : producers) t.join();
    done.store(true, std::memory_order_relaxed);
    for (auto& t : consumers) t.join();

    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

    size_t total = produced.load();
    double throughput = total / (duration.count() / 1000.0);

    printf("%s: %zu items in %lld ms = %.2f M items/sec\n",
           name, total, duration.count(), throughput / 1e6);
}

Benchmarks to run:

  1. Single producer, single consumer (baseline)
  2. 4 producers, 4 consumers (balanced)
  3. 8 producers, 8 consumers (scaling)
  4. 1 producer, 8 consumers (producer-bound)
  5. 8 producers, 1 consumer (consumer-bound)

Phase 6: Stress Testing (Days 19-21)

Goal: Verify correctness under extreme conditions.

Stress tests:

  1. All items delivered exactly once:
    void test_exactness() {
     MPMCQueue<int, 1024> q;
     constexpr int N = 1000000;
     std::vector<std::atomic<int>> received(N);
    
     // ... run producers/consumers ...
    
     // Verify each item received exactly once
     for (int i = 0; i < N; ++i) {
         assert(received[i].load() == 1 && "Item not received exactly once!");
     }
    }
    
  2. Long-running stability:
    ./mpmc_stress --threads 16 --duration 3600  # 1 hour
    
  3. ThreadSanitizer:
    g++ -fsanitize=thread -g -O2 -o mpmc_tsan mpmc_test.cpp
    ./mpmc_tsan
    

Testing Strategy

Unit Tests

Test Description Expected Result
test_single_thread Enqueue/dequeue 1000 items sequentially All items in FIFO order
test_spsc 1 producer, 1 consumer, 100K items All items transferred
test_mpsc 4 producers, 1 consumer, 100K items each All 400K items received
test_spmc 1 producer, 4 consumers, 100K items All items received exactly once
test_mpmc 4P, 4C, 100K items each All 400K items received exactly once
test_full_queue Fill queue, verify try_enqueue fails Returns false when full
test_empty_queue Verify try_dequeue on empty Returns false when empty

Concurrency Tests

// Run with ThreadSanitizer
void test_no_data_races() {
    MPMCQueue<int, 256> q;

    std::vector<std::thread> threads;
    for (int i = 0; i < 8; ++i) {
        threads.emplace_back([&, i] {
            for (int j = 0; j < 10000; ++j) {
                if (i % 2 == 0) {
                    q.enqueue(j);
                } else {
                    int v;
                    q.try_dequeue(v);
                }
            }
        });
    }

    for (auto& t : threads) t.join();
    // TSan will report any races
}

Property-Based Tests

  1. Conservation: Items in = Items out
  2. Per-producer FIFO: Items from each producer arrive in order
  3. Progress: System eventually empties (no deadlock)
  4. Liveness: With balanced P/C, queue doesn’t overflow/underflow

Common Pitfalls and Debugging

Pitfall 1: False Sharing

Symptom: Performance degrades with more threads; cache-miss counters spike.

Cause: Slots or position counters share cache lines.

Detection:

perf stat -e L1-dcache-load-misses,L1-dcache-loads ./benchmark

Solution:

// BAD: Positions share cache line
std::atomic<size_t> enqueue_pos;
std::atomic<size_t> dequeue_pos;  // Only 8 bytes apart!

// GOOD: Each on own cache line
alignas(64) std::atomic<size_t> enqueue_pos;
alignas(64) std::atomic<size_t> dequeue_pos;

Pitfall 2: Wrong Memory Ordering

Symptom: Consumers see garbage data or miss updates.

Cause: Using relaxed ordering where acquire/release needed.

Detection: ThreadSanitizer, running on ARM hardware.

Solution: Review acquire-release pairs:

// Producer must release after writing data
slot.data = value;
slot.sequence.store(pos + 1, std::memory_order_release);  // NOT relaxed!

// Consumer must acquire before reading data
while (slot.sequence.load(std::memory_order_acquire) != pos + 1) {}
value = slot.data;  // Safe to read now

Pitfall 3: Incorrect Sequence Calculation

Symptom: Queue gets stuck; threads spin forever.

Cause: Off-by-one in sequence number updates.

Solution: Trace through the state machine:

For slot at index I, position P (where P & mask == I):

Initial state:         sequence = I       (ready for producer at pos I)
After producer stores: sequence = P + 1   (ready for consumer at pos P)
After consumer loads:  sequence = P + Capacity  (ready for producer at pos P + Capacity)
                                  = P + mask + 1

Verify: P + mask + 1 = (P + Capacity) & mask... wait, that's wrong!
       P + mask + 1 is the full sequence, not the masked index.
       The sequence keeps growing, never wraps!

Pitfall 4: Non-Power-of-2 Size

Symptom: Crashes, wrong slot access, data corruption.

Cause: Using % instead of & for indexing.

Solution:

// Enforce at compile time
static_assert((Capacity & (Capacity - 1)) == 0, "Must be power of 2");

// Use bitwise AND, not modulo
Slot& slot = buffer_[pos & mask_];  // FAST
// NOT: buffer_[pos % Capacity];     // SLOW and doesn't work with sequence trick

Pitfall 5: Overflow After Long Running

Symptom: Queue breaks after running for days.

Cause: Position counters overflow 64-bit limit.

Analysis: At 10M ops/second:

  • 64-bit overflow after: 2^64 / 10M / 60 / 60 / 24 / 365 = 58,000 years

Reality: Not a practical concern with 64-bit counters.

Debugging Techniques

  1. Add logging (debug mode only):
    #ifdef DEBUG_QUEUE
    #define QUEUE_LOG(fmt, ...) fprintf(stderr, "[T%zu] " fmt "\n", \
     std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100, ##__VA_ARGS__)
    #else
    #define QUEUE_LOG(fmt, ...)
    #endif
    
  2. Validate invariants:
    void debug_dump() {
     printf("enqueue_pos: %zu\n", enqueue_pos_.load());
     printf("dequeue_pos: %zu\n", dequeue_pos_.load());
     for (size_t i = 0; i < Capacity; ++i) {
         printf("  slot[%zu]: seq=%zu\n", i, buffer_[i].sequence.load());
     }
    }
    
  3. Use GDB with thread awareness:
    gdb ./mpmc_test
    (gdb) break enqueue
    (gdb) run
    (gdb) info threads
    (gdb) thread 2
    (gdb) bt
    

Extensions and Challenges

Extension 1: Wait-Free Variant

Convert the blocking operations to wait-free by bounding retry loops:

  • Each thread gets a “ticket” for progress
  • After N failed attempts, use a slower fallback path
  • Guarantees bounded worst-case time

Research: “A Scalable Lock-free Stack Algorithm” by Danny Hendler et al.

Extension 2: NUMA-Aware Version

For multi-socket systems, consider:

  • Per-NUMA-node queues with work stealing
  • Memory allocation pinned to NUMA nodes
  • Thread affinity for locality

Extension 3: Batched Operations

Amortize atomic operation overhead:

size_t enqueue_batch(T* items, size_t count);
size_t dequeue_batch(T* items, size_t max_count);

Claim multiple slots with single fetch_add.

Extension 4: Priority Support

Add priority levels:

  • Multiple queues (one per priority)
  • Weighted dequeue selection
  • Starvation prevention for low priorities

Extension 5: Integration with io_uring/epoll

Use the queue as event backbone:

  • Network I/O completion queue
  • Timer events
  • File I/O results

Challenge: Beat Folly MPMCQueue

Study Facebook’s Folly implementation and identify opportunities:

  • Their approach to memory reclamation
  • Their backoff strategies
  • Their NUMA optimizations

Can you match or exceed their throughput?


Resources

Essential Reading

Resource Topic Why Important
“C++ Concurrency in Action” Ch. 7.3 Lock-free queue design Foundation of this project
“The Art of Multiprocessor Programming” Ch. 10 Queues and stacks Theoretical background
Folly MPMCQueue Production implementation Industry-standard reference
rigtorp/MPMCQueue Minimal implementation Clean, understandable code
1024cores.net Dmitry Vyukov’s design Original sequence number technique

Reference Implementations

  • Folly MPMCQueue: Facebook’s production queue
  • Boost.Lockfree: Cross-platform, well-tested
  • moodycamel::ConcurrentQueue: Excellent single-header option
  • crossbeam-channel (Rust): Go channel semantics in Rust

Papers

  • Herlihy, “Wait-Free Synchronization” (1991)
  • Michael & Scott, “Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms” (1996)
  • Morrison & Afek, “Fast Concurrent Queues for x86 Processors” (2013)

Tools

Tool Purpose
ThreadSanitizer Detect data races
Valgrind (helgrind) Thread error detection
perf stat Cache miss analysis
cachegrind Detailed cache simulation
Intel VTune Advanced profiling

Self-Assessment Checklist

Before considering this project complete, verify:

Understanding (Can you explain…)

  • Why sequence numbers prevent the ABA problem in this design?
  • What memory ordering is required and why?
  • How cache-line alignment prevents false sharing?
  • The difference between lock-free and wait-free?
  • Why power-of-2 size enables fast indexing?

Implementation (Did you…)

  • Implement blocking enqueue/dequeue correctly?
  • Implement non-blocking try_enqueue/try_dequeue?
  • Add proper cache-line alignment?
  • Use correct memory ordering?
  • Handle all edge cases (empty, full)?

Testing (Have you verified…)

  • All items delivered exactly once under stress?
  • ThreadSanitizer reports no races?
  • Performance beats mutex version by 5x+?
  • Throughput scales with core count?
  • No deadlocks after long-running tests?

Performance (Did you achieve…)

  • > 8M items/second (4P/4C)?
  • < 500ns average latency?
  • Minimal cache misses per operation?

Submission/Completion Criteria

Your implementation is complete when:

  1. Functionality:
    • All unit tests pass
    • All concurrency tests pass (with TSan)
    • Stress test runs for 1+ hour without issues
  2. Performance:
    • Throughput >= 8M items/second (4P/4C, small payload)
    • Performance scales reasonably with thread count
    • Outperforms mutex-based queue by at least 5x
  3. Code Quality:
    • Clean, well-documented implementation
    • Proper use of memory ordering (justified in comments)
    • No compiler warnings at -Wall -Wextra
  4. Documentation:
    • README explaining design decisions
    • Benchmark results with methodology
    • Analysis of where time is spent
  5. Deliverables:
    • mpmc_queue.hpp: Header-only library
    • mpmc_test.cpp: Unit and stress tests
    • mpmc_benchmark.cpp: Performance benchmarks
    • README.md: Design document and results

Interview Questions You Should Be Able to Answer

After completing this project:

  1. “Walk me through how your MPMC queue works.”
    • Explain the ring buffer, sequence numbers, and producer/consumer coordination.
  2. “What makes it lock-free? Is it wait-free?”
    • Lock-free: Global progress guaranteed (some thread always makes progress)
    • Not wait-free: Individual threads can starve under contention
  3. “How do you prevent the ABA problem?”
    • Positions monotonically increase, never reused
    • Sequence numbers encode state, checked against expected position
  4. “What would break if you used relaxed memory ordering everywhere?”
    • Consumer might see stale/garbage data
    • Works on x86 by accident, breaks on ARM
  5. “How would you modify this for NUMA systems?”
    • Discuss per-node queues, work stealing, memory allocation strategies
  6. “Compare your design to Go channels and Rust crossbeam.”
    • Go: Uses similar techniques but with goroutine parking
    • Crossbeam: More advanced epoch-based reclamation for unbounded queues
  7. “What’s the throughput ceiling and what limits it?”
    • Atomic contention on position counters
    • Cache coherence traffic between cores
    • Memory bandwidth for data copy
  8. “How would you debug a subtle concurrency bug in this?”
    • ThreadSanitizer for race detection
    • Logging with thread ID and timestamps
    • Reduce to minimal reproduction case

This project represents the culmination of lock-free programming skills. Master it, and you’ll understand the concurrent primitives that power the world’s highest-performance systems.