Project 6: Lock-Free MPMC Queue
Quick Reference
| Attribute | Details |
|---|---|
| Difficulty | Master |
| Time Estimate | 3-4 weeks |
| Primary Language | C++ |
| Alternative Languages | C, Rust |
| Knowledge Area | Advanced Lock-Free / High-Performance Systems |
| Tools Required | g++/clang++, ThreadSanitizer, perf, cachegrind |
| Primary Reference | “C++ Concurrency in Action, Second Edition” by Anthony Williams, Chapter 7.3 |
| Prerequisites | Project 5 (Lock-Free Stack) completed. Deep understanding of memory ordering. Familiarity with cache architecture. |
Learning Objectives
By completing this project, you will be able to:
- Design and implement a bounded lock-free MPMC queue using ring buffers and sequence numbers
- Coordinate multiple producers and consumers without locks using atomic compare-and-swap operations
- Apply the sequence number trick to safely hand off slots between producers and consumers
- Eliminate false sharing through cache-line alignment and padding
- Achieve high throughput (8M+ items/second) through contention minimization
- Verify lock-free guarantees and understand the difference from wait-free
- Benchmark and compare your implementation against mutex-based and industry alternatives
- Debug subtle concurrency bugs using memory sanitizers and formal reasoning
Theoretical Foundation
Core Concepts
What is an MPMC Queue?
An MPMC (Multi-Producer Multi-Consumer) queue allows:
- Multiple threads to enqueue items concurrently (producers)
- Multiple threads to dequeue items concurrently (consumers)
- No mutual exclusion between operations (lock-free)
This is the “final boss” of lock-free data structures because it requires coordinating four types of concurrent operations:
- Producer vs Producer (who claims which slot?)
- Consumer vs Consumer (who claims which item?)
- Producer vs Consumer at same slot (is data ready?)
- Wraparound (when buffer is full/empty)
Traditional Mutex-Based Queue:
┌─────────────────────────────────────────┐
│ │
P1─┼──┐ │
P2─┼──┼──► [LOCK] ──► QUEUE ──► [LOCK] ──►──┼──C1
P3─┼──┘ ├──C2
│ All operations serialize on lock │
└─────────────────────────────────────────┘
Lock-Free MPMC Queue:
┌─────────────────────────────────────────┐
│ │
P1─┼──────────────────┐ │
P2─┼──────────────────┼──► RING BUFFER ──►──┼──C1
P3─┼──────────────────┘ (per-slot ├──C2
│ sequencing) │
│ Operations proceed in parallel │
└─────────────────────────────────────────┘
Sequence Numbers: The Key Insight
The fundamental trick that makes lock-free MPMC queues work is the sequence number per slot. Each slot in the ring buffer has an atomic sequence number that acts as a “turn indicator”:
Sequence Number State Machine:
┌─────────────────────────────┐
│ │
▼ │
┌───────────────────────────────┐ │
│ sequence == position │ ───────────┘
│ "Slot ready for producer" │
└───────────────┬───────────────┘
│
│ Producer stores data,
│ sets sequence = position + 1
│
▼
┌───────────────────────────────┐
│ sequence == position + 1 │
│ "Slot ready for consumer" │
└───────────────┬───────────────┘
│
│ Consumer loads data,
│ sets sequence = position + buffer_size
│
▼
┌───────────────────────────────┐
│ sequence == next_position │ ──► Next cycle begins
│ "Slot ready for next producer"│
└───────────────────────────────┘
For a buffer of size N and position P:
sequence == P: Slot is empty and ready for producer claiming position Psequence == P + 1: Slot contains data for consumer claiming position Psequence == P + N: Slot is ready for producer claiming position P + N (next cycle)
Bounded Ring Buffer Structure
The queue uses a ring buffer with power-of-2 size for fast modular arithmetic:
Ring Buffer Layout (size = 8):
Index: 0 1 2 3 4 5 6 7
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
Slot: │ S0 │ S1 │ S2 │ S3 │ S4 │ S5 │ S6 │ S7 │
└──▲──┴─────┴──▲──┴─────┴─────┴─────┴──▲──┴─────┘
│ │ │
dequeue_pos enqueue_pos wrap point
= 8 = 10 (pos & 7)
(8 & 7 = 0) (10 & 7 = 2)
Position → Index mapping: index = position & (size - 1)
index = position & mask
Why power of 2?
position % size = SLOW (division)
position & mask = FAST (single AND instruction)
Cache-Line Alignment
Modern CPUs transfer memory in 64-byte cache lines. If two variables share a cache line, writing one invalidates the other in all cores’ caches (false sharing):
False Sharing Problem:
┌─────────────────────────────────────────────────────────────────┐
│ Cache Line 0 (64 bytes) │
│ ┌────────────────────┬────────────────────┬──────────────────┐ │
│ │ enqueue_pos (8B) │ dequeue_pos (8B) │ padding (48B) │ │
│ └────────┬───────────┴─────────┬──────────┴──────────────────┘ │
│ │ │ │
│ │ Producer writes │ Consumer writes │
│ │ here constantly │ here constantly │
│ │ │ │
│ └──────────┬──────────┘ │
│ │ │
│ CACHE LINE PING-PONG! │
│ Every write invalidates the other │
└─────────────────────────────────────────────────────────────────┘
Solution: Separate Cache Lines
┌─────────────────────────────────────────────────────────────────┐
│ Cache Line 0 (64 bytes) │
│ ┌────────────────────┬──────────────────────────────────────┐ │
│ │ enqueue_pos (8B) │ padding (56B) │ │
│ └────────────────────┴──────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Cache Line 1 (64 bytes) │
│ ┌────────────────────┬──────────────────────────────────────┐ │
│ │ dequeue_pos (8B) │ padding (56B) │ │
│ └────────────────────┴──────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Now producers and consumers don't interfere!
Why This Matters
MPMC queues are the communication backbone of every major concurrent system:
| System | Use Case | Why MPMC? |
|---|---|---|
| Tokio (Rust) | Task queue | Distribute async tasks to worker threads |
| Go Scheduler | Run queue | Balance goroutines across OS threads |
| TBB (Intel) | Work stealing | Share tasks between thread pools |
| LMAX Disruptor | Event processing | Ultra-low-latency financial systems |
| Game Engines | Job systems | Parallel physics, rendering, AI |
Performance impact:
Throughput Comparison (8 threads, 64-slot queue):
std::queue + mutex: 1.2M items/second ████
boost::lockfree::queue: 7.5M items/second ██████████████████████████████████
Well-tuned MPMC: 8.9M items/second ████████████████████████████████████████
0 2 4 6 8 10 (millions/second)
The difference between 1.2M and 8.9M is a 7.4x speedup. In systems processing billions of messages daily, this translates to massive infrastructure cost savings.
Historical Context
The modern bounded MPMC queue design evolved through decades of research:
1990s: Lock-Free Foundations
- Maurice Herlihy proves universality of compare-and-swap (CAS)
- Michael & Scott develop classic lock-free queue (unbounded, single-word CAS)
2000s: Practical Implementations
- Intel TBB concurrent_queue brings lock-free to mainstream
- LMAX Disruptor shows cache-aware design can achieve sub-microsecond latency
2010s: Bounded MPMC Refinement
- Facebook Folly MPMCQueue achieves predictable memory usage
- Dmitry Vyukov’s MPMC designs influence Rust crossbeam and Go channels
- rigtorp/MPMCQueue demonstrates minimal, high-performance C++ implementation
2020s: Hardware-Aware Optimization
- ARM memory model considerations (weaker than x86)
- NUMA-aware queue designs for multi-socket systems
- Seqlock-inspired techniques for read-heavy workloads
Common Misconceptions
Misconception 1: “Lock-free means faster”
Reality: Lock-free guarantees progress (no deadlock), not speed. A well-tuned mutex can outperform a poorly designed lock-free structure.
Lock-free wins when:
- High contention (many threads fighting for access)
- Short critical sections (mutex overhead dominates)
- Real-time requirements (bounded worst-case latency)
Lock-free loses when:
- Low contention (mutex barely ever waits)
- Complex operations (CAS retry loops waste cycles)
- Single producer/consumer (simpler designs exist)
Misconception 2: “Just use atomic and you're done"
Reality: Making individual operations atomic doesn’t make the algorithm correct. The MPMC queue requires carefully ordered sequences of operations:
// WRONG: Each operation is atomic, but together they're broken
void broken_enqueue(T value) {
size_t pos = enqueue_pos.fetch_add(1); // 1. Claim position
buffer[pos % size] = value; // 2. Store data
// No synchronization! Consumer might see garbage!
}
// RIGHT: Sequence number coordinates handoff
void correct_enqueue(T value) {
size_t pos = enqueue_pos.fetch_add(1); // 1. Claim position
Slot& slot = buffer[pos & mask];
while (slot.sequence.load(acquire) != pos) { } // 2. Wait for slot
slot.data = value; // 3. Store data
slot.sequence.store(pos + 1, release); // 4. Signal consumer
}
Misconception 3: “ABA problem is only for pointers”
Reality: The ABA problem can occur with any value that cycles. In MPMC queues, we avoid it by:
- Using monotonically increasing positions (never reused)
- Sequence numbers that encode state, not identity
Misconception 4: “x86 doesn’t need memory ordering”
Reality: x86 has a strong memory model (TSO) that prevents many reorderings, BUT:
- Compiler reorderings still happen
- Atomic operations need correct memory order for correctness
- Code must work on ARM (phones, Apple Silicon, AWS Graviton)
// This "works" on x86 by accident, but breaks on ARM:
sequence.store(pos + 1, relaxed); // Consumer might not see data!
// Correct on all platforms:
sequence.store(pos + 1, release); // Data writes happen-before this
Project Specification
What You’re Building
A production-quality bounded lock-free MPMC queue library with:
- Core queue implementation with configurable capacity
- Thread-safe enqueue and dequeue operations
- Try variants that don’t block
- Comprehensive benchmarks comparing against alternatives
- Stress tests for correctness verification
Functional Requirements
# Build and run benchmarks
$ mkdir build && cd build
$ cmake .. && make
$ ./mpmc_benchmark
# Run stress tests
$ ./mpmc_stress --threads 16 --duration 60
# Run correctness tests
$ ./mpmc_tests
Expected Output
$ ./mpmc_queue_benchmark
Queue Configuration:
Capacity: 65536 slots
Slot size: 64 bytes (cache-line aligned)
Producers: 4
Consumers: 4
Throughput test (10 seconds):
Total items transferred: 89,000,000
Throughput: 8.9M items/second
Avg latency: 450ns (producer enqueue to consumer dequeue)
P99 latency: 1.2us
Comparison:
std::queue + std::mutex: 1.2M items/second
Boost.Lockfree MPMC: 7.5M items/second
Our implementation: 8.9M items/second
Stress test (vary producer/consumer ratios):
1P/4C: 3.2M items/sec (consumer-bound)
4P/1C: 3.1M items/sec (producer-bound)
4P/4C: 8.9M items/sec (balanced)
8P/8C: 12.1M items/sec (scales with cores)
Correctness tests:
All items delivered exactly once: PASS
FIFO order per-producer: PASS
No deadlocks after 1 hour: PASS
Non-Functional Requirements
| Requirement | Target |
|---|---|
| Throughput | > 8M items/second (4P/4C) |
| Latency (avg) | < 500ns enqueue-to-dequeue |
| Latency (P99) | < 2us |
| Memory | Fixed allocation (no dynamic alloc in steady state) |
| Correctness | Zero data loss, no duplicates |
| Progress | Lock-free (starvation possible, deadlock impossible) |
Solution Architecture
High-Level Design
┌──────────────────────────────────────────────────────────────────────────┐
│ MPMCQueue<T, Size> │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Cache Line 0 (64 bytes) │ │
│ │ ┌─────────────────────────┬────────────────────────────────────┐│ │
│ │ │ enqueue_pos: atomic<u64>│ padding ││ │
│ │ │ (producers claim here) │ ││ │
│ │ └─────────────────────────┴────────────────────────────────────┘│ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Cache Line 1 (64 bytes) │ │
│ │ ┌─────────────────────────┬────────────────────────────────────┐│ │
│ │ │ dequeue_pos: atomic<u64>│ padding ││ │
│ │ │ (consumers claim here) │ ││ │
│ │ └─────────────────────────┴────────────────────────────────────┘│ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Ring Buffer (Size slots) │ │
│ │ │ │
│ │ Slot 0 Slot 1 Slot 2 ... Slot N-1 │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │seq: 0 │ │seq: 1 │ │seq: 2 │ ... │seq: N-1│ │ │
│ │ │data: T │ │data: T │ │data: T │ │data: T │ │ │
│ │ │padding │ │padding │ │padding │ │padding │ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ │ 64B 64B 64B 64B │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
├──────────────────────────────────────────────────────────────────────────┤
│ Operations: │
│ - enqueue(T) : void (blocking push) │
│ - try_enqueue(T) : bool (non-blocking push) │
│ - dequeue() : T (blocking pop) │
│ - try_dequeue(T&) : bool (non-blocking pop) │
│ - size_approx() : size_t (approximate count) │
└──────────────────────────────────────────────────────────────────────────┘
Key Components
1. Slot Structure
Each slot in the ring buffer contains:
struct alignas(64) Slot { // Aligned to cache line
std::atomic<size_t> sequence; // Turn indicator
T data; // Actual payload
// Implicit padding to 64 bytes
};
Why cache-line alignment?
- Prevents false sharing between adjacent slots
- Different threads can work on different slots without cache invalidation
- Critical for scaling to many cores
2. Position Counters
Two atomic counters track the next position for producers and consumers:
alignas(64) std::atomic<size_t> enqueue_pos{0}; // Own cache line
alignas(64) std::atomic<size_t> dequeue_pos{0}; // Own cache line
These counters monotonically increase, wrapping around via the mask:
actual_index = position & (buffer_size - 1)
3. Sequence Number Protocol
The sequence number in each slot coordinates the producer-consumer handoff:
Initial state (empty queue of size 4):
Position: enqueue_pos = 0, dequeue_pos = 0
Slot[0].sequence = 0 (ready for producer at pos 0)
Slot[1].sequence = 1 (ready for producer at pos 1)
Slot[2].sequence = 2 (ready for producer at pos 2)
Slot[3].sequence = 3 (ready for producer at pos 3)
After producer claims position 0 and stores data:
Slot[0].sequence = 1 (ready for consumer at pos 0)
After consumer claims position 0 and loads data:
Slot[0].sequence = 4 (ready for producer at pos 4 = next cycle)
Data Structures
#include <atomic>
#include <vector>
#include <cstdint>
#include <new> // For hardware_destructive_interference_size
// Cache line size (typically 64 bytes)
#ifdef __cpp_lib_hardware_interference_size
constexpr size_t CACHE_LINE = std::hardware_destructive_interference_size;
#else
constexpr size_t CACHE_LINE = 64;
#endif
template<typename T, size_t Capacity>
class MPMCQueue {
static_assert((Capacity & (Capacity - 1)) == 0,
"Capacity must be power of 2");
static_assert(Capacity >= 2, "Capacity must be at least 2");
private:
struct alignas(CACHE_LINE) Slot {
std::atomic<size_t> sequence;
T data;
// Padding is implicit due to alignas
};
static constexpr size_t mask_ = Capacity - 1;
// Each on its own cache line to prevent false sharing
alignas(CACHE_LINE) std::atomic<size_t> enqueue_pos_{0};
alignas(CACHE_LINE) std::atomic<size_t> dequeue_pos_{0};
// The ring buffer
alignas(CACHE_LINE) Slot buffer_[Capacity];
public:
MPMCQueue();
void enqueue(const T& value);
void enqueue(T&& value);
bool try_enqueue(const T& value);
bool try_enqueue(T&& value);
T dequeue();
bool try_dequeue(T& value);
size_t size_approx() const;
bool empty() const;
static constexpr size_t capacity() { return Capacity; }
};
Producer Algorithm (Enqueue)
enqueue(value):
1. pos = enqueue_pos.fetch_add(1, relaxed) // Claim a position
2. slot = buffer[pos & mask] // Find slot
3. while slot.sequence.load(acquire) != pos: // Wait until slot ready
pause/yield // Spin-wait hint
4. slot.data = move(value) // Store data
5. slot.sequence.store(pos + 1, release) // Signal consumer
Sequence == pos means:
"This slot is empty and the producer at position 'pos' can use it"
Setting sequence = pos + 1 means:
"Data is now stored; consumer at position 'pos' can take it"
Consumer Algorithm (Dequeue)
dequeue():
1. pos = dequeue_pos.fetch_add(1, relaxed) // Claim a position
2. slot = buffer[pos & mask] // Find slot
3. while slot.sequence.load(acquire) != pos + 1: // Wait for data
pause/yield // Spin-wait hint
4. value = move(slot.data) // Load data
5. slot.sequence.store(pos + mask + 1, release) // Ready for next producer
6. return value
Sequence == pos + 1 means:
"This slot has data for consumer at position 'pos'"
Setting sequence = pos + mask + 1 means:
"Slot is now empty; producer at position 'pos + Capacity' can use it"
(This is the next cycle for this slot index)
Memory Ordering Analysis
| Operation | Memory Order | Why |
|---|---|---|
fetch_add on position |
relaxed |
Just need atomicity; ordering comes from sequence |
sequence.load in spin-wait |
acquire |
Need to see data stored before sequence update |
sequence.store after data |
release |
Make data visible before signaling |
The acquire-release pairs create synchronization:
- Producer’s
store(release)synchronizes-with Consumer’sload(acquire) - This guarantees the consumer sees the data the producer wrote
Implementation Guide
Phase 1: Core Data Structures (Days 1-3)
Goal: Set up the slot structure and ring buffer with correct alignment.
Steps:
- Define the cache line constant:
#ifdef __cpp_lib_hardware_interference_size constexpr size_t CACHE_LINE = std::hardware_destructive_interference_size; #else constexpr size_t CACHE_LINE = 64; #endif - Create the Slot structure with proper alignment:
```cpp
template
struct alignas(CACHE_LINE) Slot { std::atomic sequence; T data; // Compiler adds padding to reach CACHE_LINE size };
// Verify size at compile time
static_assert(sizeof(Slot
3. Initialize the buffer with sequential sequence numbers:
```cpp
template<typename T, size_t Capacity>
MPMCQueue<T, Capacity>::MPMCQueue() {
for (size_t i = 0; i < Capacity; ++i) {
buffer_[i].sequence.store(i, std::memory_order_relaxed);
}
}
Validation:
sizeof(Slot<T>)should equal 64 (or your cache line size)- Check alignment with
alignof(Slot<T>)
Phase 2: Basic Enqueue/Dequeue (Days 4-7)
Goal: Implement blocking enqueue and dequeue.
Steps:
- Implement enqueue:
template<typename T, size_t Capacity> void MPMCQueue<T, Capacity>::enqueue(const T& value) { // 1. Claim position size_t pos = enqueue_pos_.fetch_add(1, std::memory_order_relaxed); // 2. Get slot Slot& slot = buffer_[pos & mask_]; // 3. Wait for slot to be ready size_t expected_seq = pos; while (slot.sequence.load(std::memory_order_acquire) != expected_seq) { // Spin-wait hint for CPU #if defined(__x86_64__) || defined(_M_X64) _mm_pause(); // x86 pause instruction #elif defined(__aarch64__) asm volatile("yield"); // ARM yield #endif } // 4. Store data slot.data = value; // 5. Signal consumer slot.sequence.store(pos + 1, std::memory_order_release); } - Implement dequeue:
template<typename T, size_t Capacity> T MPMCQueue<T, Capacity>::dequeue() { // 1. Claim position size_t pos = dequeue_pos_.fetch_add(1, std::memory_order_relaxed); // 2. Get slot Slot& slot = buffer_[pos & mask_]; // 3. Wait for data size_t expected_seq = pos + 1; while (slot.sequence.load(std::memory_order_acquire) != expected_seq) { #if defined(__x86_64__) || defined(_M_X64) _mm_pause(); #elif defined(__aarch64__) asm volatile("yield"); #endif } // 4. Load data T value = std::move(slot.data); // 5. Signal next producer (for next cycle) slot.sequence.store(pos + mask_ + 1, std::memory_order_release); return value; }
Validation:
- Single-threaded test: enqueue N items, dequeue N items, verify order
- Two-thread test: one producer, one consumer
Phase 3: Non-Blocking Variants (Days 8-10)
Goal: Implement try_enqueue and try_dequeue that return immediately.
Steps:
- Implement try_enqueue:
template<typename T, size_t Capacity> bool MPMCQueue<T, Capacity>::try_enqueue(const T& value) { size_t pos = enqueue_pos_.load(std::memory_order_relaxed); while (true) { Slot& slot = buffer_[pos & mask_]; size_t seq = slot.sequence.load(std::memory_order_acquire); intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos); if (diff == 0) { // Slot is ready, try to claim position if (enqueue_pos_.compare_exchange_weak( pos, pos + 1, std::memory_order_relaxed)) { // Claimed! Store data slot.data = value; slot.sequence.store(pos + 1, std::memory_order_release); return true; } // CAS failed, loop will retry with updated pos } else if (diff < 0) { // Queue is full (producer caught up to consumer) return false; } else { // Another producer beat us, reload and retry pos = enqueue_pos_.load(std::memory_order_relaxed); } } } - Implement try_dequeue:
template<typename T, size_t Capacity> bool MPMCQueue<T, Capacity>::try_dequeue(T& value) { size_t pos = dequeue_pos_.load(std::memory_order_relaxed); while (true) { Slot& slot = buffer_[pos & mask_]; size_t seq = slot.sequence.load(std::memory_order_acquire); intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1); if (diff == 0) { // Data is ready, try to claim position if (dequeue_pos_.compare_exchange_weak( pos, pos + 1, std::memory_order_relaxed)) { // Claimed! Load data value = std::move(slot.data); slot.sequence.store(pos + mask_ + 1, std::memory_order_release); return true; } // CAS failed, loop will retry with updated pos } else if (diff < 0) { // Queue is empty (consumer caught up to producer) return false; } else { // Another consumer beat us, reload and retry pos = dequeue_pos_.load(std::memory_order_relaxed); } } }
Key insight: The try variants use compare_exchange_weak instead of fetch_add. This allows:
- Detecting full/empty conditions before claiming a slot
- Backing off gracefully without advancing the position counter
Validation:
- Test try_enqueue on full queue returns false
- Test try_dequeue on empty queue returns false
Phase 4: Optimization (Days 11-14)
Goal: Tune for maximum throughput.
Steps:
- Backoff strategy: Replace tight spinning with progressive backoff:
struct Backoff { unsigned spin_count = 0; static constexpr unsigned SPIN_LIMIT = 1000; static constexpr unsigned YIELD_LIMIT = 10; void wait() { if (spin_count < SPIN_LIMIT) { // CPU spin with pause hint for (unsigned i = 0; i < (1u << (spin_count / 100)); ++i) { #if defined(__x86_64__) _mm_pause(); #elif defined(__aarch64__) asm volatile("yield"); #endif } ++spin_count; } else if (spin_count < SPIN_LIMIT + YIELD_LIMIT) { std::this_thread::yield(); ++spin_count; } else { std::this_thread::sleep_for(std::chrono::microseconds(1)); } } void reset() { spin_count = 0; } }; - Prefetching: Hint the CPU about upcoming accesses:
void enqueue(const T& value) { size_t pos = enqueue_pos_.fetch_add(1, std::memory_order_relaxed); Slot& slot = buffer_[pos & mask_]; // Prefetch next slot for next enqueue __builtin_prefetch(&buffer_[(pos + 1) & mask_], 1, 3); // ... rest of enqueue } - Verify no false sharing: Use perf to measure cache performance:
perf stat -e cache-misses,cache-references ./mpmc_benchmark
Phase 5: Benchmarking (Days 15-18)
Goal: Measure and compare performance.
Benchmark harness:
#include <chrono>
#include <thread>
#include <vector>
template<typename Queue>
void benchmark(const char* name, Queue& q,
int num_producers, int num_consumers,
int items_per_producer) {
std::atomic<size_t> produced{0};
std::atomic<size_t> consumed{0};
std::atomic<bool> done{false};
auto start = std::chrono::high_resolution_clock::now();
// Launch producers
std::vector<std::thread> producers;
for (int p = 0; p < num_producers; ++p) {
producers.emplace_back([&, p] {
for (int i = 0; i < items_per_producer; ++i) {
q.enqueue(p * items_per_producer + i);
produced.fetch_add(1, std::memory_order_relaxed);
}
});
}
// Launch consumers
std::vector<std::thread> consumers;
for (int c = 0; c < num_consumers; ++c) {
consumers.emplace_back([&] {
while (!done.load(std::memory_order_relaxed)) {
int value;
if (q.try_dequeue(value)) {
consumed.fetch_add(1, std::memory_order_relaxed);
}
}
// Drain remaining
int value;
while (q.try_dequeue(value)) {
consumed.fetch_add(1, std::memory_order_relaxed);
}
});
}
// Wait for producers
for (auto& t : producers) t.join();
done.store(true, std::memory_order_relaxed);
for (auto& t : consumers) t.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
size_t total = produced.load();
double throughput = total / (duration.count() / 1000.0);
printf("%s: %zu items in %lld ms = %.2f M items/sec\n",
name, total, duration.count(), throughput / 1e6);
}
Benchmarks to run:
- Single producer, single consumer (baseline)
- 4 producers, 4 consumers (balanced)
- 8 producers, 8 consumers (scaling)
- 1 producer, 8 consumers (producer-bound)
- 8 producers, 1 consumer (consumer-bound)
Phase 6: Stress Testing (Days 19-21)
Goal: Verify correctness under extreme conditions.
Stress tests:
- All items delivered exactly once:
void test_exactness() { MPMCQueue<int, 1024> q; constexpr int N = 1000000; std::vector<std::atomic<int>> received(N); // ... run producers/consumers ... // Verify each item received exactly once for (int i = 0; i < N; ++i) { assert(received[i].load() == 1 && "Item not received exactly once!"); } } - Long-running stability:
./mpmc_stress --threads 16 --duration 3600 # 1 hour - ThreadSanitizer:
g++ -fsanitize=thread -g -O2 -o mpmc_tsan mpmc_test.cpp ./mpmc_tsan
Testing Strategy
Unit Tests
| Test | Description | Expected Result |
|---|---|---|
test_single_thread |
Enqueue/dequeue 1000 items sequentially | All items in FIFO order |
test_spsc |
1 producer, 1 consumer, 100K items | All items transferred |
test_mpsc |
4 producers, 1 consumer, 100K items each | All 400K items received |
test_spmc |
1 producer, 4 consumers, 100K items | All items received exactly once |
test_mpmc |
4P, 4C, 100K items each | All 400K items received exactly once |
test_full_queue |
Fill queue, verify try_enqueue fails | Returns false when full |
test_empty_queue |
Verify try_dequeue on empty | Returns false when empty |
Concurrency Tests
// Run with ThreadSanitizer
void test_no_data_races() {
MPMCQueue<int, 256> q;
std::vector<std::thread> threads;
for (int i = 0; i < 8; ++i) {
threads.emplace_back([&, i] {
for (int j = 0; j < 10000; ++j) {
if (i % 2 == 0) {
q.enqueue(j);
} else {
int v;
q.try_dequeue(v);
}
}
});
}
for (auto& t : threads) t.join();
// TSan will report any races
}
Property-Based Tests
- Conservation: Items in = Items out
- Per-producer FIFO: Items from each producer arrive in order
- Progress: System eventually empties (no deadlock)
- Liveness: With balanced P/C, queue doesn’t overflow/underflow
Common Pitfalls and Debugging
Pitfall 1: False Sharing
Symptom: Performance degrades with more threads; cache-miss counters spike.
Cause: Slots or position counters share cache lines.
Detection:
perf stat -e L1-dcache-load-misses,L1-dcache-loads ./benchmark
Solution:
// BAD: Positions share cache line
std::atomic<size_t> enqueue_pos;
std::atomic<size_t> dequeue_pos; // Only 8 bytes apart!
// GOOD: Each on own cache line
alignas(64) std::atomic<size_t> enqueue_pos;
alignas(64) std::atomic<size_t> dequeue_pos;
Pitfall 2: Wrong Memory Ordering
Symptom: Consumers see garbage data or miss updates.
Cause: Using relaxed ordering where acquire/release needed.
Detection: ThreadSanitizer, running on ARM hardware.
Solution: Review acquire-release pairs:
// Producer must release after writing data
slot.data = value;
slot.sequence.store(pos + 1, std::memory_order_release); // NOT relaxed!
// Consumer must acquire before reading data
while (slot.sequence.load(std::memory_order_acquire) != pos + 1) {}
value = slot.data; // Safe to read now
Pitfall 3: Incorrect Sequence Calculation
Symptom: Queue gets stuck; threads spin forever.
Cause: Off-by-one in sequence number updates.
Solution: Trace through the state machine:
For slot at index I, position P (where P & mask == I):
Initial state: sequence = I (ready for producer at pos I)
After producer stores: sequence = P + 1 (ready for consumer at pos P)
After consumer loads: sequence = P + Capacity (ready for producer at pos P + Capacity)
= P + mask + 1
Verify: P + mask + 1 = (P + Capacity) & mask... wait, that's wrong!
P + mask + 1 is the full sequence, not the masked index.
The sequence keeps growing, never wraps!
Pitfall 4: Non-Power-of-2 Size
Symptom: Crashes, wrong slot access, data corruption.
Cause: Using % instead of & for indexing.
Solution:
// Enforce at compile time
static_assert((Capacity & (Capacity - 1)) == 0, "Must be power of 2");
// Use bitwise AND, not modulo
Slot& slot = buffer_[pos & mask_]; // FAST
// NOT: buffer_[pos % Capacity]; // SLOW and doesn't work with sequence trick
Pitfall 5: Overflow After Long Running
Symptom: Queue breaks after running for days.
Cause: Position counters overflow 64-bit limit.
Analysis: At 10M ops/second:
- 64-bit overflow after: 2^64 / 10M / 60 / 60 / 24 / 365 = 58,000 years
Reality: Not a practical concern with 64-bit counters.
Debugging Techniques
- Add logging (debug mode only):
#ifdef DEBUG_QUEUE #define QUEUE_LOG(fmt, ...) fprintf(stderr, "[T%zu] " fmt "\n", \ std::hash<std::thread::id>{}(std::this_thread::get_id()) % 100, ##__VA_ARGS__) #else #define QUEUE_LOG(fmt, ...) #endif - Validate invariants:
void debug_dump() { printf("enqueue_pos: %zu\n", enqueue_pos_.load()); printf("dequeue_pos: %zu\n", dequeue_pos_.load()); for (size_t i = 0; i < Capacity; ++i) { printf(" slot[%zu]: seq=%zu\n", i, buffer_[i].sequence.load()); } } - Use GDB with thread awareness:
gdb ./mpmc_test (gdb) break enqueue (gdb) run (gdb) info threads (gdb) thread 2 (gdb) bt
Extensions and Challenges
Extension 1: Wait-Free Variant
Convert the blocking operations to wait-free by bounding retry loops:
- Each thread gets a “ticket” for progress
- After N failed attempts, use a slower fallback path
- Guarantees bounded worst-case time
Research: “A Scalable Lock-free Stack Algorithm” by Danny Hendler et al.
Extension 2: NUMA-Aware Version
For multi-socket systems, consider:
- Per-NUMA-node queues with work stealing
- Memory allocation pinned to NUMA nodes
- Thread affinity for locality
Extension 3: Batched Operations
Amortize atomic operation overhead:
size_t enqueue_batch(T* items, size_t count);
size_t dequeue_batch(T* items, size_t max_count);
Claim multiple slots with single fetch_add.
Extension 4: Priority Support
Add priority levels:
- Multiple queues (one per priority)
- Weighted dequeue selection
- Starvation prevention for low priorities
Extension 5: Integration with io_uring/epoll
Use the queue as event backbone:
- Network I/O completion queue
- Timer events
- File I/O results
Challenge: Beat Folly MPMCQueue
Study Facebook’s Folly implementation and identify opportunities:
- Their approach to memory reclamation
- Their backoff strategies
- Their NUMA optimizations
Can you match or exceed their throughput?
Resources
Essential Reading
| Resource | Topic | Why Important |
|---|---|---|
| “C++ Concurrency in Action” Ch. 7.3 | Lock-free queue design | Foundation of this project |
| “The Art of Multiprocessor Programming” Ch. 10 | Queues and stacks | Theoretical background |
| Folly MPMCQueue | Production implementation | Industry-standard reference |
| rigtorp/MPMCQueue | Minimal implementation | Clean, understandable code |
| 1024cores.net | Dmitry Vyukov’s design | Original sequence number technique |
Reference Implementations
- Folly MPMCQueue: Facebook’s production queue
- Boost.Lockfree: Cross-platform, well-tested
- moodycamel::ConcurrentQueue: Excellent single-header option
- crossbeam-channel (Rust): Go channel semantics in Rust
Papers
- Herlihy, “Wait-Free Synchronization” (1991)
- Michael & Scott, “Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms” (1996)
- Morrison & Afek, “Fast Concurrent Queues for x86 Processors” (2013)
Tools
| Tool | Purpose |
|---|---|
| ThreadSanitizer | Detect data races |
| Valgrind (helgrind) | Thread error detection |
| perf stat | Cache miss analysis |
| cachegrind | Detailed cache simulation |
| Intel VTune | Advanced profiling |
Self-Assessment Checklist
Before considering this project complete, verify:
Understanding (Can you explain…)
- Why sequence numbers prevent the ABA problem in this design?
- What memory ordering is required and why?
- How cache-line alignment prevents false sharing?
- The difference between lock-free and wait-free?
- Why power-of-2 size enables fast indexing?
Implementation (Did you…)
- Implement blocking enqueue/dequeue correctly?
- Implement non-blocking try_enqueue/try_dequeue?
- Add proper cache-line alignment?
- Use correct memory ordering?
- Handle all edge cases (empty, full)?
Testing (Have you verified…)
- All items delivered exactly once under stress?
- ThreadSanitizer reports no races?
- Performance beats mutex version by 5x+?
- Throughput scales with core count?
- No deadlocks after long-running tests?
Performance (Did you achieve…)
- > 8M items/second (4P/4C)?
- < 500ns average latency?
- Minimal cache misses per operation?
Submission/Completion Criteria
Your implementation is complete when:
- Functionality:
- All unit tests pass
- All concurrency tests pass (with TSan)
- Stress test runs for 1+ hour without issues
- Performance:
- Throughput >= 8M items/second (4P/4C, small payload)
- Performance scales reasonably with thread count
- Outperforms mutex-based queue by at least 5x
- Code Quality:
- Clean, well-documented implementation
- Proper use of memory ordering (justified in comments)
- No compiler warnings at -Wall -Wextra
- Documentation:
- README explaining design decisions
- Benchmark results with methodology
- Analysis of where time is spent
- Deliverables:
mpmc_queue.hpp: Header-only librarympmc_test.cpp: Unit and stress testsmpmc_benchmark.cpp: Performance benchmarksREADME.md: Design document and results
Interview Questions You Should Be Able to Answer
After completing this project:
- “Walk me through how your MPMC queue works.”
- Explain the ring buffer, sequence numbers, and producer/consumer coordination.
- “What makes it lock-free? Is it wait-free?”
- Lock-free: Global progress guaranteed (some thread always makes progress)
- Not wait-free: Individual threads can starve under contention
- “How do you prevent the ABA problem?”
- Positions monotonically increase, never reused
- Sequence numbers encode state, checked against expected position
- “What would break if you used relaxed memory ordering everywhere?”
- Consumer might see stale/garbage data
- Works on x86 by accident, breaks on ARM
- “How would you modify this for NUMA systems?”
- Discuss per-node queues, work stealing, memory allocation strategies
- “Compare your design to Go channels and Rust crossbeam.”
- Go: Uses similar techniques but with goroutine parking
- Crossbeam: More advanced epoch-based reclamation for unbounded queues
- “What’s the throughput ceiling and what limits it?”
- Atomic contention on position counters
- Cache coherence traffic between cores
- Memory bandwidth for data copy
- “How would you debug a subtle concurrency bug in this?”
- ThreadSanitizer for race detection
- Logging with thread ID and timestamps
- Reduce to minimal reproduction case
This project represents the culmination of lock-free programming skills. Master it, and you’ll understand the concurrent primitives that power the world’s highest-performance systems.