Project 2: Thread-Safe Producer-Consumer Queue
The “Hello, World!” of practical concurrency - master the fundamental patterns that power every concurrent system
Quick Reference
| Attribute | Value |
|---|---|
| Difficulty | Advanced |
| Time Estimate | 1-2 weeks |
| Language | C++17 |
| Prerequisites | C++ fundamentals, basic threading concepts, templates |
| Key Topics | std::mutex, std::condition_variable, race conditions, deadlock prevention |
| Primary Tool | std::thread, std::mutex, std::condition_variable |
| Main Book | “C++ Concurrency in Action, 2nd Edition” by Anthony Williams |
1. Learning Objectives
By completing this project, you will be able to:
- Implement thread-safe data structures using mutexes and condition variables without data races
- Explain the producer-consumer pattern and when it’s appropriate vs other concurrency models
- Handle spurious wakeups correctly using predicate-based waiting
- Design graceful shutdown mechanisms that don’t leave threads hanging or data corrupted
- Debug race conditions using ThreadSanitizer and systematic reasoning
- Answer concurrency interview questions with deep practical understanding
- Choose between different synchronization primitives (mutex vs atomic vs lock-free) based on tradeoffs
2. Theoretical Foundation
2.1 Core Concepts
What is a Race Condition?
A race condition occurs when two or more threads access shared data concurrently, and at least one of them writes. The result depends on the unpredictable timing of thread execution.
Thread 1 Thread 2 counter
──────── ──────── ───────
read counter (0) 0
read counter (0) 0
increment (0 + 1 = 1) 0
increment (0 + 1 = 1) 0
write counter (1) 1
write counter (1) 1
Expected: 2, Actual: 1 - DATA RACE!
The Mutex (Mutual Exclusion)
A mutex ensures only one thread can access a critical section at a time:
Thread State Machine with Mutex
┌─────────────────┐
│ Running │
│ (no lock held) │
└────────┬────────┘
│ lock()
▼
┌─────────────────┐ lock() by
│ Waiting for │◄────────────┐
│ Mutex │ │
└────────┬────────┘ │
│ acquired │
▼ │
┌─────────────────┐ │
│ Holding Lock │─────────────┘
│ (critical sect) │ another thread
└────────┬────────┘ tries to lock
│ unlock()
▼
┌─────────────────┐
│ Running │
│ (lock freed) │
└─────────────────┘
The Condition Variable
A condition variable allows a thread to efficiently wait for a condition to become true without busy-waiting (spinning):
Condition Variable Wait/Notify Flow
Producer Thread Consumer Thread
═══════════════ ═══════════════
lock(mutex) lock(mutex)
│
push(item) ▼
│ ┌─────────────────┐
│ │ cv.wait(lock, │
▼ │ predicate) │
cv.notify_one() ─────────────────────────────►│ │
│ wakeup signal │ if (!pred): │
│ │ unlock mutex │
unlock(mutex) │ sleep │
│ ... zzz ... │
│ [notified] │
│ lock mutex │
│ check pred │
│ if true: exit │
└────────┬────────┘
│
▼
pop(item)
unlock(mutex)
Why std::unique_lock with condition_variable?
std::lock_guard can’t be used with condition variables because:
cv.wait()must unlock the mutex while sleepingcv.wait()must relock the mutex before returningstd::lock_guardhas no unlock mechanism
// WRONG - won't compile
std::lock_guard<std::mutex> lock(m);
cv.wait(lock, predicate); // Error: lock_guard can't be unlocked
// CORRECT - unique_lock supports unlock/lock
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, predicate); // Works: unique_lock can unlock internally
Spurious Wakeups
A thread waiting on a condition variable can wake up even when not notified. This is an allowed optimization in many OS implementations.
WHY SPURIOUS WAKEUPS HAPPEN
OS scheduler might wake threads for:
- System signals
- Scheduler optimizations
- Multiprocessor sync issues
- Implementation details of pthread/futex
SOLUTION: Always check the condition in a loop
// WRONG - vulnerable to spurious wakeup
cv.wait(lock); // Woke up, but is data actually ready?
process(data); // CRASH: data might not be ready!
// CORRECT - predicate checked after every wakeup
cv.wait(lock, []{ return !queue.empty(); }); // Re-checks each time
process(data); // Safe: data is definitely ready
2.2 Why This Matters
The producer-consumer pattern appears everywhere in software:
| Domain | Producer | Consumer | Queue |
|---|---|---|---|
| Web servers | Network I/O thread | Worker threads | Request queue |
| GUI apps | Event loop | UI renderer | Event queue |
| Logging | Application threads | Log writer | Log buffer |
| Databases | Query parser | Query executor | Query queue |
| Video games | Input handler | Game loop | Input events |
| Message queues | Kafka producers | Kafka consumers | Topic partitions |
Performance impact: A well-implemented concurrent queue is the difference between a system that scales linearly with cores and one that becomes slower as you add CPUs.
2.3 Historical Context
The producer-consumer problem was formalized by Dijkstra in 1965 as one of the classical synchronization problems, alongside the dining philosophers and readers-writers problems.
Evolution of Synchronization in C++
1998 C++98 │ No threading support (use pthreads)
│ │
2011 C++11 ──┼── std::thread, std::mutex, std::condition_variable
│ │ First standard threading support!
│ │
2014 C++14 ──┼── shared_timed_mutex (reader-writer locks)
│ │
2017 C++17 ──┼── std::scoped_lock (deadlock-free multi-lock)
│ │ std::shared_mutex
│ │
2020 C++20 ──┼── std::jthread (auto-joining thread)
│ │ std::latch, std::barrier, std::semaphore
│ │ std::atomic_ref, std::atomic<shared_ptr>
│ │
2023+ Future ──┼── Executors proposal (still in progress)
2.4 Common Misconceptions
Misconception 1: “Volatile makes variables thread-safe”
Reality: volatile only prevents compiler optimization, not CPU reordering or atomicity. Use std::atomic for thread safety.
Misconception 2: “I only need a mutex around writes” Reality: Reads also need protection. Without a mutex, a read can see partially-written data.
Misconception 3: “More mutexes = more parallelism” Reality: Fine-grained locking adds overhead and deadlock risk. Start coarse, optimize only when profiling shows contention.
Misconception 4: “Lock-free is always faster” Reality: Lock-free algorithms are harder to implement correctly and can be slower under low contention due to failed CAS loops.
Misconception 5: “notify_one() always wakes exactly one thread” Reality: The specification says it wakes “at least one” thread. The implementation may wake more.
3. Project Specification
3.1 What You Will Build
A production-quality templated thread-safe queue that:
- Allows multiple producer threads to push items concurrently
- Allows multiple consumer threads to pop items concurrently
- Blocks consumers when the queue is empty (no busy-waiting)
- Supports graceful shutdown that wakes all waiting consumers
- Optionally supports a maximum capacity with blocking push
3.2 Functional Requirements
| ID | Requirement | Priority |
|---|---|---|
| F1 | push(T value) adds an item to the queue thread-safely |
Must |
| F2 | pop() removes and returns an item, blocking if empty |
Must |
| F3 | try_pop() returns immediately with optional value |
Should |
| F4 | shutdown() signals all consumers to stop waiting |
Must |
| F5 | Queue works with any copyable/movable type T | Must |
| F6 | Multiple producers can push concurrently | Must |
| F7 | Multiple consumers can pop concurrently | Must |
| F8 | Bounded capacity with blocking push | Could |
| F9 | size() returns approximate current size |
Could |
3.3 Non-Functional Requirements
| ID | Requirement | Metric |
|---|---|---|
| NF1 | No data races | ThreadSanitizer clean |
| NF2 | No deadlocks | All threads eventually make progress |
| NF3 | No busy-waiting | CPU usage near 0% when idle |
| NF4 | Minimal contention | Lock held only during queue operations |
| NF5 | Exception safe | Strong guarantee for push |
3.4 Example Usage / Output
#include "thread_safe_queue.hpp"
#include <thread>
#include <iostream>
int main() {
ThreadSafeQueue<int> queue;
std::atomic<int> sum{0};
// Producer threads
auto producer = [&](int start) {
for (int i = start; i < start + 100; ++i) {
queue.push(i);
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
};
// Consumer threads
auto consumer = [&]() {
while (true) {
auto item = queue.pop();
if (!item.has_value()) break; // Shutdown signal
sum += *item;
}
};
std::thread p1(producer, 0);
std::thread p2(producer, 100);
std::thread c1(consumer);
std::thread c2(consumer);
p1.join();
p2.join();
queue.shutdown();
c1.join();
c2.join();
std::cout << "Sum: " << sum << std::endl;
// Expected: 0+1+2+...+199 = 19900
}
Expected output:
$ ./producer_consumer_demo
Producer 0 pushed: 0, 1, 2, ...
Producer 1 pushed: 100, 101, 102, ...
Consumer 0 popped: 0
Consumer 1 popped: 100
Consumer 0 popped: 1
...
Producer 0 finished
Producer 1 finished
Queue shutting down...
Consumer 0 shutting down
Consumer 1 shutting down
Sum: 19900
All threads joined successfully
3.5 Real World Outcome
When complete, you’ll have a reusable component that can power:
Your ThreadSafeQueue Powers:
┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
│ Network │ │ │ │ Worker │
│ Listener │────►│ ThreadSafe- │────►│ Thread │
│ │ │ Queue<Request> │ │ Pool │
└─────────────┘ │ │ └─────────────┘
└─────────────────┘
┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
│ App Code │ │ │ │ Log File │
│ (logging) │────►│ ThreadSafe- │────►│ Writer │
│ │ │ Queue<LogMsg> │ │ │
└─────────────┘ └─────────────────┘ └─────────────┘
┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
│ Event │ │ │ │ Event │
│ Sources │────►│ ThreadSafe- │────►│ Handlers │
│ │ │ Queue<Event> │ │ │
└─────────────┘ └─────────────────┘ └─────────────┘
4. Solution Architecture
4.1 High-Level Design
Thread-Safe Queue Architecture
══════════════════════════════
┌───────────────────────────────────────────────────┐
│ ThreadSafeQueue<T> │
│ │
Producers │ ┌─────────┐ ┌──────────────────────────────┐ │ Consumers
═════════ │ │ │ │ Internal Queue │ │ ═════════
│ │ Mutex │ │ │ │
┌─────────┐ │ │ (m) │ │ ┌────┬────┬────┬────┬────┐ │ │ ┌─────────┐
│Thread 1 │─┐ │ │ │ │ │ T1 │ T2 │ T3 │ T4 │ T5 │ │ │ ┌─│Thread A │
└─────────┘ │ │ └────┬────┘ │ └────┴────┴────┴────┴────┘ │ │ │ └─────────┘
┌─────────┐ ├──┼───────┼────────┤ std::queue<T> q │──┼─┤
│Thread 2 │─┤ │ │ │ │ │ │ ┌─────────┐
└─────────┘ │ │ ▼ └──────────────────────────────┘ │ ├─│Thread B │
┌─────────┐ │ │ ┌─────────┐ │ │ └─────────┘
│Thread 3 │─┘ │ │Condition│ ┌──────────────────────────────┐ │ │
└─────────┘ │ │Variable │ │ Shutdown Flag │ │ │ ┌─────────┐
│ │ (cv) │ │ bool is_shutdown │ │ └─│Thread C │
push(T) │ └─────────┘ └──────────────────────────────┘ │ └─────────┘
─────────► │ │ │ │ ◄─────────
│ └─────────────────────┘ │ pop()
│ notify_one/all() │
└───────────────────────────────────────────────────┘
4.2 Key Components
| Component | Purpose | Implementation |
|---|---|---|
std::queue<T> q |
Stores the actual data | FIFO ordering, not thread-safe |
std::mutex m |
Protects all shared state | Guards q, is_shutdown |
std::condition_variable cv |
Consumer notification | Wakes sleeping consumers |
bool is_shutdown |
Graceful shutdown flag | Checked after wakeup |
4.3 Data Structures
template<typename T>
class ThreadSafeQueue {
private:
// The underlying queue (not thread-safe by itself)
std::queue<T> q;
// Mutex to protect ALL shared state
mutable std::mutex m;
// Condition variable for consumer notification
std::condition_variable cv;
// Shutdown flag
bool is_shutdown = false;
public:
void push(T value);
std::optional<T> pop();
std::optional<T> try_pop();
void shutdown();
bool empty() const;
size_t size() const;
};
4.4 Algorithm Overview: Wait/Notify Pattern
push() Algorithm:
1. Lock mutex
2. Push item to queue
3. Unlock mutex (via RAII)
4. Notify one waiting consumer
pop() Algorithm:
1. Lock mutex (unique_lock for cv.wait)
2. Wait on CV with predicate: (!queue.empty() || is_shutdown)
- If predicate false: unlock mutex, sleep
- On wakeup: relock mutex, recheck predicate
- If predicate true: continue
3. If shutdown and queue empty: return nullopt
4. Pop item from queue
5. Unlock mutex (via RAII)
6. Return item
shutdown() Algorithm:
1. Lock mutex
2. Set is_shutdown = true
3. Unlock mutex
4. Notify ALL waiting consumers
State Diagram for Consumer Thread
┌────────────────────────┐
│ RUNNING │
│ (not holding lock) │
└───────────┬────────────┘
│
│ call pop()
▼
┌────────────────────────┐
│ ACQUIRING LOCK │
│ unique_lock(mutex) │
└───────────┬────────────┘
│
│ lock acquired
▼
┌────────────────────────────────────┐
│ CHECK PREDICATE │
│ (!queue.empty() || is_shutdown) │
└────────────────┬───────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ false │ true │
▼ │ ▼
┌────────────────────┐ │ ┌────────────────────┐
│ WAITING │ │ │ LOCK HELD │
│ unlock mutex │ │ │ check shutdown │
│ sleep on cv │ │ └────────┬───────────┘
│ ... │ │ │
│ [notified] │ │ ┌────────────┴────────────┐
│ relock mutex │─────────┘ │ │
└────────────────────┘ ▼ ▼
shutdown && empty data available
┌─────────────┐ ┌─────────────┐
│return nullopt│ │ pop & return│
└─────────────┘ └─────────────┘
5. Implementation Guide
5.1 Development Environment Setup
# Ubuntu/Debian
sudo apt install g++-11 cmake valgrind
# Verify C++17 support
g++ --version # Should be 7+ for C++17
# Install ThreadSanitizer (usually included with GCC)
# Verify with:
echo 'int main(){}' | g++ -fsanitize=thread -x c++ -
# macOS
brew install llvm cmake
# Verify
clang++ --version # Should be 10+ for good C++17
5.2 Project Structure
thread_safe_queue/
├── include/
│ └── thread_safe_queue.hpp # Header-only implementation
├── src/
│ └── main.cpp # Demo/test driver
├── tests/
│ ├── unit_tests.cpp # Single-threaded correctness tests
│ ├── concurrent_tests.cpp # Multi-threaded stress tests
│ └── benchmark.cpp # Performance tests
├── CMakeLists.txt
└── README.md
CMakeLists.txt:
cmake_minimum_required(VERSION 3.14)
project(ThreadSafeQueue)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Enable ThreadSanitizer for debug builds
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=thread -g")
set(CMAKE_EXE_LINKER_FLAGS_DEBUG "${CMAKE_EXE_LINKER_FLAGS_DEBUG} -fsanitize=thread")
add_executable(demo src/main.cpp)
target_include_directories(demo PRIVATE include)
target_link_libraries(demo pthread)
add_executable(tests tests/concurrent_tests.cpp)
target_include_directories(tests PRIVATE include)
target_link_libraries(tests pthread)
5.3 The Core Question You’re Answering
“How do you safely share a data structure between multiple threads while avoiding both data races and busy-waiting, and how do you cleanly shut down when work is done?”
This question forces you to understand:
- Why shared mutable state is dangerous
- How mutexes provide mutual exclusion
- How condition variables provide efficient waiting
- Why predicate-based waiting is essential
- How to signal shutdown without deadlock
5.4 Concepts You Must Understand First
Before coding, ensure you can answer these questions:
| Concept | Self-Assessment Question | Where to Learn |
|---|---|---|
| Thread basics | How does std::thread differ from a function call? |
Williams Ch. 2 |
| Mutex semantics | What happens if a thread tries to lock an already-locked mutex? | Williams Ch. 3.1 |
| RAII locking | Why use std::lock_guard instead of m.lock()/m.unlock()? |
Williams Ch. 3.2 |
| Condition variables | Why can’t you use lock_guard with condition variables? |
Williams Ch. 4.1 |
| Spurious wakeups | What is a spurious wakeup and why does it happen? | Williams Ch. 4.1.1 |
| Memory ordering | What does “happens-before” mean? | Williams Ch. 5 |
5.5 Questions to Guide Your Design
Data Structure Questions:
- What underlying container will you use? (std::queue, std::deque, custom?)
- Should the queue be bounded (max capacity) or unbounded?
- What happens when pop() is called on an empty queue?
Thread Safety Questions:
- Which operations need to be atomic?
- Where exactly should the mutex be locked and unlocked?
- Can multiple readers read concurrently, or does size() also need locking?
Condition Variable Questions:
- When should notify_one() vs notify_all() be used?
- What is the predicate for the consumer’s wait?
- How will you handle spurious wakeups?
Shutdown Questions:
- How do waiting consumers know to stop?
- Should shutdown() wait for the queue to empty, or return immediately?
- What should pop() return after shutdown?
5.6 Thinking Exercise
Before writing any code, trace through this scenario by hand:
Initial State: Queue is empty, 2 consumers waiting
Time Producer Queue Consumer A Consumer B
──── ──────── ───── ────────── ──────────
T1 push(42) [42] waiting (cv) waiting (cv)
T2 notify_one() [42] woken, locking waiting (cv)
T3 [42] locked, popping waiting (cv)
T4 [] unlocked waiting (cv)
processing 42
Now answer:
- Why didn’t Consumer B wake up at T2?
- What would happen if we used
notify_all()instead? - What if the producer pushes 2 items before any consumer wakes?
5.7 Hints in Layers
Hint 1: Getting Started
Start with the simplest possible interface:
template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> q;
std::mutex m;
std::condition_variable cv;
bool done = false;
public:
void push(T value);
std::optional<T> pop();
void shutdown();
};
Focus on getting push() and pop() correct first. The key insight is that push() needs to notify after releasing the lock.
Hint 2: The Pop Implementation
The critical part is the wait predicate:
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(m);
// This is the KEY line - handles spurious wakeups
cv.wait(lock, [this]{ return !q.empty() || done; });
// After wait: either queue has data OR we're shutting down
if (done && q.empty()) {
return std::nullopt;
}
T value = std::move(q.front());
q.pop();
return value;
}
The lambda predicate is checked:
- Before sleeping (if true, don’t sleep at all)
- After every wakeup (if false, go back to sleep)
Hint 3: Push and Notify Timing
Should you notify inside or outside the lock?
void push(T value) {
{
std::lock_guard<std::mutex> lock(m);
q.push(std::move(value));
} // Lock released HERE
cv.notify_one(); // Notify AFTER unlocking
}
Why notify after unlocking?
- If notified thread wakes immediately, it can acquire the lock right away
- Avoids “hurry up and wait” where notified thread wakes but can’t proceed
Note: Notifying inside the lock is also correct (standard guarantees atomicity), just potentially less efficient.
Hint 4: Shutdown Mechanism
void shutdown() {
{
std::lock_guard<std::mutex> lock(m);
done = true;
}
cv.notify_all(); // Wake ALL waiting consumers
}
Why notify_all() and not notify_one()?
- All consumers need to check
doneflag - With
notify_one(), only one consumer wakes, others deadlock
After shutdown:
- pop() returns nullopt for waiting consumers
- Future push() calls should be handled (either assert, ignore, or queue normally)
5.8 The Interview Questions They’ll Ask
After completing this project, you should be able to answer:
- “Explain the producer-consumer problem and how you’d solve it in C++.”
- Define the problem (shared buffer, synchronization needs)
- Explain mutex for mutual exclusion
- Explain condition variable for efficient waiting
- Discuss notify_one vs notify_all
- “What’s a spurious wakeup and how do you handle it?”
- Define: waking without notification
- Cause: OS scheduler optimizations, implementation details
- Solution: always wait with a predicate in a loop
- “What’s the difference between std::lock_guard and std::unique_lock?”
- lock_guard: simple RAII, lock at construction, unlock at destruction
- unique_lock: flexible, can unlock/relock, required for condition variables
- “How would you implement a bounded queue (with max capacity)?”
- Add second condition variable for “not full”
- push() waits on not_full CV when at capacity
- pop() notifies not_full CV after removing item
- “What happens if you forget to notify after pushing?”
- Consumers may wait forever (deadlock-like)
- Queue fills up but consumers never wake
- “Why use notify_all() for shutdown instead of notify_one()?”
- All consumers need to see shutdown flag
- notify_one() would leave N-1 consumers waiting
- “How would you add timeout support to pop()?”
- Use
cv.wait_for()orcv.wait_until() - Return nullopt on timeout
- Check predicate after timeout (might have data)
- Use
- “Is your queue lock-free? Should it be?”
- No, uses mutex
- Lock-free harder to implement correctly
- Lock-based often faster under low contention
- Profile before optimizing
5.9 Books That Will Help
| Topic | Book | Chapter |
|---|---|---|
| Thread basics and std::thread | C++ Concurrency in Action | Ch. 2: Managing threads |
| Mutexes and protecting shared data | C++ Concurrency in Action | Ch. 3: Sharing data between threads |
| Condition variables | C++ Concurrency in Action | Ch. 4: Synchronizing concurrent operations |
| Memory model and atomics | C++ Concurrency in Action | Ch. 5: The C++ memory model and operations |
| Lock-free alternatives | C++ Concurrency in Action | Ch. 7: Designing lock-free data structures |
| RAII and exception safety | Effective C++ | Item 13: Use objects to manage resources |
| Move semantics for efficiency | Effective Modern C++ | Items 23-25: Move semantics |
5.10 Implementation Phases
Phase 1: Basic Queue (Days 1-2)
Goal: Single producer, single consumer, no shutdown
// Start with the absolute minimum
template<typename T>
class ThreadSafeQueue {
std::queue<T> q;
std::mutex m;
std::condition_variable cv;
public:
void push(T value) {
std::lock_guard<std::mutex> lock(m);
q.push(std::move(value));
cv.notify_one();
}
T pop() { // Blocking pop, returns value directly
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this]{ return !q.empty(); });
T value = std::move(q.front());
q.pop();
return value;
}
};
Checkpoint: Test with one producer pushing 1000 items, one consumer popping 1000 items. Verify all items received.
Phase 2: Multi-Producer Multi-Consumer (Days 3-4)
Goal: Multiple producers and consumers work correctly
Changes:
- Same code works! (mutex ensures mutual exclusion)
- Add stress test with N producers, M consumers
Checkpoint: Run ThreadSanitizer, verify no data races:
g++ -std=c++17 -fsanitize=thread -g tests.cpp -lpthread
./a.out # Should report no errors
Phase 3: Shutdown Support (Days 5-6)
Goal: Clean shutdown without hanging threads
template<typename T>
class ThreadSafeQueue {
// ... previous members ...
bool is_shutdown = false;
public:
void shutdown() {
{
std::lock_guard<std::mutex> lock(m);
is_shutdown = true;
}
cv.notify_all();
}
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this]{ return !q.empty() || is_shutdown; });
if (is_shutdown && q.empty()) {
return std::nullopt;
}
T value = std::move(q.front());
q.pop();
return value;
}
};
Checkpoint: Producers finish, call shutdown(), all consumers exit cleanly.
Phase 4: try_pop and Additional Features (Days 7-8)
Goal: Non-blocking pop, size query
std::optional<T> try_pop() {
std::lock_guard<std::mutex> lock(m);
if (q.empty()) {
return std::nullopt;
}
T value = std::move(q.front());
q.pop();
return value;
}
bool empty() const {
std::lock_guard<std::mutex> lock(m);
return q.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(m);
return q.size();
}
Note: size() is inherently racy - by the time you use the result, it may have changed.
Phase 5: Bounded Queue (Days 9-10) - Optional
Goal: Maximum capacity with blocking push
template<typename T>
class BoundedQueue {
std::queue<T> q;
std::mutex m;
std::condition_variable not_empty; // Consumer waits on this
std::condition_variable not_full; // Producer waits on this
size_t capacity;
bool is_shutdown = false;
public:
explicit BoundedQueue(size_t cap) : capacity(cap) {}
void push(T value) {
std::unique_lock<std::mutex> lock(m);
not_full.wait(lock, [this]{ return q.size() < capacity || is_shutdown; });
if (is_shutdown) return; // or throw
q.push(std::move(value));
not_empty.notify_one();
}
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(m);
not_empty.wait(lock, [this]{ return !q.empty() || is_shutdown; });
if (is_shutdown && q.empty()) return std::nullopt;
T value = std::move(q.front());
q.pop();
not_full.notify_one(); // Signal producer that space available
return value;
}
};
5.11 Key Implementation Decisions
| Decision | Options | Recommendation | Rationale |
|---|---|---|---|
| Return type of pop() | T vs optional<T> |
optional<T> |
Allows signaling shutdown cleanly |
| Notification timing | Inside vs outside lock | Outside (after lock) | Slightly more efficient |
| Underlying container | queue vs deque vs list | std::queue<T> |
Simple, sufficient for FIFO |
| Lock type for pop() | lock_guard vs unique_lock | unique_lock |
Required for cv.wait() |
| Bounded vs unbounded | Fixed cap vs unlimited | Unbounded first | Add bounds later if needed |
| Exception on shutdown push | Throw vs ignore vs queue | Document and choose | Depends on use case |
6. Testing Strategy
Unit Tests (Single-Threaded Correctness)
void test_basic_push_pop() {
ThreadSafeQueue<int> q;
q.push(42);
auto val = q.try_pop();
assert(val.has_value());
assert(*val == 42);
std::cout << "test_basic_push_pop PASSED\n";
}
void test_fifo_order() {
ThreadSafeQueue<int> q;
for (int i = 0; i < 100; ++i) {
q.push(i);
}
for (int i = 0; i < 100; ++i) {
auto val = q.try_pop();
assert(val.has_value() && *val == i);
}
std::cout << "test_fifo_order PASSED\n";
}
void test_empty_try_pop() {
ThreadSafeQueue<int> q;
auto val = q.try_pop();
assert(!val.has_value());
std::cout << "test_empty_try_pop PASSED\n";
}
void test_shutdown_wakes_consumers() {
ThreadSafeQueue<int> q;
std::atomic<bool> woke_up{false};
std::thread consumer([&]{
auto val = q.pop(); // Blocks on empty queue
woke_up = true;
assert(!val.has_value()); // Should return nullopt
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
assert(!woke_up); // Consumer should still be waiting
q.shutdown();
consumer.join();
assert(woke_up);
std::cout << "test_shutdown_wakes_consumers PASSED\n";
}
Concurrent Stress Tests
void stress_test_multi_producer_consumer() {
constexpr int NUM_PRODUCERS = 4;
constexpr int NUM_CONSUMERS = 4;
constexpr int ITEMS_PER_PRODUCER = 10000;
ThreadSafeQueue<int> q;
std::atomic<int> total_pushed{0};
std::atomic<int> total_popped{0};
auto producer = [&](int id) {
for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) {
q.push(i);
++total_pushed;
}
};
auto consumer = [&] {
while (true) {
auto val = q.pop();
if (!val.has_value()) break;
++total_popped;
}
};
std::vector<std::thread> producers, consumers;
for (int i = 0; i < NUM_PRODUCERS; ++i) {
producers.emplace_back(producer, i);
}
for (int i = 0; i < NUM_CONSUMERS; ++i) {
consumers.emplace_back(consumer);
}
for (auto& t : producers) t.join();
q.shutdown();
for (auto& t : consumers) t.join();
assert(total_pushed == NUM_PRODUCERS * ITEMS_PER_PRODUCER);
assert(total_popped == total_pushed);
std::cout << "stress_test PASSED: " << total_popped << " items\n";
}
ThreadSanitizer Verification
# Compile with ThreadSanitizer
g++ -std=c++17 -fsanitize=thread -g concurrent_tests.cpp -lpthread -o tsan_test
# Run - should report no data races
./tsan_test
# Expected output:
# stress_test PASSED: 40000 items
# (no ThreadSanitizer warnings)
Performance Benchmarks
void benchmark_throughput() {
ThreadSafeQueue<int> q;
constexpr int OPS = 1000000;
auto start = std::chrono::high_resolution_clock::now();
std::thread producer([&] {
for (int i = 0; i < OPS; ++i) {
q.push(i);
}
});
std::thread consumer([&] {
for (int i = 0; i < OPS; ++i) {
q.pop();
}
});
producer.join();
consumer.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
double ops_per_sec = (OPS * 2.0) / (duration.count() / 1000.0);
std::cout << "Throughput: " << ops_per_sec << " ops/sec\n";
}
7. Common Pitfalls & Debugging
| Pitfall | Symptom | Fix |
|---|---|---|
| Forgetting to notify | Consumers hang forever | Always notify_one() after push() |
| Using lock_guard with CV | Compile error | Use unique_lock for cv.wait() |
| Notifying without lock | Potential lost wakeup | Notify is safe without lock, but set state with lock |
| Not using predicate wait | Random hangs (spurious wakeups) | Always use cv.wait(lock, predicate) |
| notify_one for shutdown | Only one consumer exits | Use notify_all() for shutdown |
| Data race on shutdown flag | TSan error, undefined behavior | Always read/write under mutex |
| Locking in wrong order | Deadlock | Consistent lock ordering |
| Forgetting move semantics | Unnecessary copies | Use std::move in push/pop |
Debugging Race Conditions
Step 1: Enable ThreadSanitizer
g++ -fsanitize=thread -g your_code.cpp -lpthread
Step 2: Interpret TSan output
WARNING: ThreadSanitizer: data race (pid=12345)
Write of size 4 at 0x... by thread T1:
#0 ThreadSafeQueue::push() at queue.hpp:42
Previous read of size 4 at 0x... by thread T2:
#0 ThreadSafeQueue::pop() at queue.hpp:58
This tells you exactly which lines have the race.
Step 3: Common TSan warnings and fixes
| TSan Warning | Likely Cause | Fix |
|---|---|---|
| Data race on bool | Shutdown flag accessed without lock | Add lock around all flag accesses |
| Data race on queue | Queue accessed without lock | Verify lock scope covers all operations |
| Lock-order-inversion | Two threads lock A,B in different orders | Establish consistent lock ordering |
Performance Debugging
High lock contention symptoms:
- CPU usage high but throughput low
- Many threads in “waiting for lock” state
Diagnosis:
// Add timing instrumentation
auto start = std::chrono::steady_clock::now();
std::lock_guard<std::mutex> lock(m);
auto acquired = std::chrono::steady_clock::now();
auto wait_time = acquired - start;
// Log wait_time if > threshold
Solutions:
- Reduce critical section size
- Use try_lock with fallback
- Consider lock-free queue if profiling shows contention
8. Extensions & Challenges
Beginner Extensions
- Add timeout to pop(): Use
cv.wait_for()with duration - Add statistics: Track total items pushed/popped
- Add peek(): Look at front without removing
- Thread-safe iteration: Snapshot the queue for debugging
Intermediate Extensions
- Bounded queue: Max capacity with blocking push
- Priority queue: Items popped by priority, not FIFO
- Multiple queues: Route to queues based on item type
- Wait for empty: Block until all items consumed
Advanced Extensions
- Lock-free implementation: Use atomics instead of mutex
- Work-stealing: Allow idle consumers to steal from other queues
- Batch operations: push_many() / pop_many() for efficiency
- NUMA-aware: Separate queues per NUMA node
Research Extensions
- Formal verification: Prove correctness using model checker
- Benchmark comparison: Compare with boost::lockfree, folly::MPMCQueue
- Adaptive spinning: Spin briefly before sleeping on CV
9. Real-World Connections
Industry Applications
| System | How It Uses Producer-Consumer |
|---|---|
| Apache Kafka | Topics are queues, producers push messages, consumer groups pop |
| Redis | LIST data type with BLPOP/BRPOP for blocking queues |
| RabbitMQ | Message broker built on queue semantics |
| Go channels | Language-level bounded producer-consumer queues |
| Java BlockingQueue | java.util.concurrent provides several implementations |
Related Open Source Projects
- folly::MPMCQueue: Facebook’s multi-producer multi-consumer queue (lock-free)
- boost::lockfree::queue: Boost’s lock-free queue implementation
- moodycamel::ConcurrentQueue: Popular single-header lock-free queue
- tbb::concurrent_queue: Intel Threading Building Blocks
Interview Relevance
This project prepares you for questions at companies like:
| Company | Relevant Domain |
|---|---|
| Meta | Async event processing in React Native |
| RPC systems, MapReduce task queues | |
| Amazon | SQS, internal message passing |
| Microsoft | Windows thread pool, async I/O |
| Trading firms | Order book updates, market data |
10. Resources
Essential Reading
| Resource | What You’ll Learn |
|---|---|
| “C++ Concurrency in Action” Ch. 3-4 | Deep dive on mutexes, condition variables |
| cppreference.com condition_variable | API details and edge cases |
| Herb Sutter’s “Effective Concurrency” series | Design patterns and pitfalls |
Video Resources
- CppCon 2014: “Lock-Free Programming” by Herb Sutter
- CppCon 2017: “C++ atomics: from basic to advanced” by Fedor Pikus
- MIT 6.172: Performance Engineering lectures on concurrency
Tools
- ThreadSanitizer: Data race detector (built into GCC/Clang)
- Helgrind: Valgrind tool for concurrency bugs
- rr: Record and replay debugger (great for race conditions)
- Intel Inspector: Commercial tool for threading analysis
Reference Implementations
- C++ Standard: Look at
<mutex>,<condition_variable>headers - libstdc++: GCC’s implementation in
bits/std_mutex.h - libc++: Clang’s implementation in
__threading_support
11. Self-Assessment Checklist
Before considering this project complete, verify:
Understanding
- I can explain why both mutex and condition variable are needed
- I can describe what happens during cv.wait() step by step
- I can explain spurious wakeups and the predicate solution
- I understand why unique_lock is required for condition variables
- I can explain the difference between notify_one and notify_all
Implementation
- Push and pop work correctly with single producer/consumer
- Multiple producers and consumers work without data corruption
- Shutdown wakes all waiting consumers
- ThreadSanitizer reports no data races
- CPU usage is near 0% when queue is empty (no busy-waiting)
Quality
- Code compiles with -Wall -Wextra without warnings
- Exception safety: push provides strong guarantee
- Move semantics used where appropriate
- Header-only implementation works across translation units
Verification
- All unit tests pass
- Stress tests pass consistently
- Tested with ThreadSanitizer
- Tested shutdown under various conditions
12. Submission / Completion Criteria
Minimum Viable Completion
push()andpop()work correctly- Works with multiple producers and consumers
- No data races (ThreadSanitizer clean)
- Basic test coverage
Full Completion
- All above plus:
try_pop()implementedshutdown()wakes all consumersempty()andsize()work- Comprehensive test coverage
- Clean code with comments
Excellence (Going Above & Beyond)
- Bounded queue variant
- Timeout support for pop
- Performance benchmarks
- Comparison with std::mutex alternatives (shared_mutex)
- Lock-free implementation attempt
Next Project: P03-compile-time-units.md - Template Metaprogramming
This guide was expanded from LEARN_ADVANCED_CPP_DEEP_DIVE.md. For the complete learning path, see the README.