Project 2: Thread-Safe Producer-Consumer Queue

The “Hello, World!” of practical concurrency - master the fundamental patterns that power every concurrent system


Quick Reference

Attribute Value
Difficulty Advanced
Time Estimate 1-2 weeks
Language C++17
Prerequisites C++ fundamentals, basic threading concepts, templates
Key Topics std::mutex, std::condition_variable, race conditions, deadlock prevention
Primary Tool std::thread, std::mutex, std::condition_variable
Main Book “C++ Concurrency in Action, 2nd Edition” by Anthony Williams

1. Learning Objectives

By completing this project, you will be able to:

  1. Implement thread-safe data structures using mutexes and condition variables without data races
  2. Explain the producer-consumer pattern and when it’s appropriate vs other concurrency models
  3. Handle spurious wakeups correctly using predicate-based waiting
  4. Design graceful shutdown mechanisms that don’t leave threads hanging or data corrupted
  5. Debug race conditions using ThreadSanitizer and systematic reasoning
  6. Answer concurrency interview questions with deep practical understanding
  7. Choose between different synchronization primitives (mutex vs atomic vs lock-free) based on tradeoffs

2. Theoretical Foundation

2.1 Core Concepts

What is a Race Condition?

A race condition occurs when two or more threads access shared data concurrently, and at least one of them writes. The result depends on the unpredictable timing of thread execution.

Thread 1                    Thread 2                    counter
────────                    ────────                    ───────
read counter (0)                                          0
                            read counter (0)              0
increment (0 + 1 = 1)                                     0
                            increment (0 + 1 = 1)         0
write counter (1)                                         1
                            write counter (1)             1

Expected: 2, Actual: 1 - DATA RACE!

The Mutex (Mutual Exclusion)

A mutex ensures only one thread can access a critical section at a time:

Thread State Machine with Mutex

     ┌─────────────────┐
     │    Running      │
     │  (no lock held) │
     └────────┬────────┘
              │ lock()
              ▼
     ┌─────────────────┐    lock() by
     │   Waiting for   │◄────────────┐
     │     Mutex       │             │
     └────────┬────────┘             │
              │ acquired             │
              ▼                      │
     ┌─────────────────┐             │
     │   Holding Lock  │─────────────┘
     │ (critical sect) │  another thread
     └────────┬────────┘  tries to lock
              │ unlock()
              ▼
     ┌─────────────────┐
     │    Running      │
     │  (lock freed)   │
     └─────────────────┘

The Condition Variable

A condition variable allows a thread to efficiently wait for a condition to become true without busy-waiting (spinning):

Condition Variable Wait/Notify Flow

Producer Thread                              Consumer Thread
═══════════════                              ═══════════════

lock(mutex)                                  lock(mutex)
                                                   │
push(item)                                         ▼
    │                                         ┌─────────────────┐
    │                                         │ cv.wait(lock,   │
    ▼                                         │   predicate)    │
cv.notify_one() ─────────────────────────────►│                 │
    │             wakeup signal               │ if (!pred):     │
    │                                         │   unlock mutex  │
unlock(mutex)                                 │   sleep         │
                                              │   ... zzz ...   │
                                              │   [notified]    │
                                              │   lock mutex    │
                                              │   check pred    │
                                              │   if true: exit │
                                              └────────┬────────┘
                                                       │
                                                       ▼
                                              pop(item)
                                              unlock(mutex)

Why std::unique_lock with condition_variable?

std::lock_guard can’t be used with condition variables because:

  1. cv.wait() must unlock the mutex while sleeping
  2. cv.wait() must relock the mutex before returning
  3. std::lock_guard has no unlock mechanism
// WRONG - won't compile
std::lock_guard<std::mutex> lock(m);
cv.wait(lock, predicate);  // Error: lock_guard can't be unlocked

// CORRECT - unique_lock supports unlock/lock
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, predicate);  // Works: unique_lock can unlock internally

Spurious Wakeups

A thread waiting on a condition variable can wake up even when not notified. This is an allowed optimization in many OS implementations.

WHY SPURIOUS WAKEUPS HAPPEN

OS scheduler might wake threads for:
- System signals
- Scheduler optimizations
- Multiprocessor sync issues
- Implementation details of pthread/futex

SOLUTION: Always check the condition in a loop

// WRONG - vulnerable to spurious wakeup
cv.wait(lock);  // Woke up, but is data actually ready?
process(data);  // CRASH: data might not be ready!

// CORRECT - predicate checked after every wakeup
cv.wait(lock, []{ return !queue.empty(); });  // Re-checks each time
process(data);  // Safe: data is definitely ready

2.2 Why This Matters

The producer-consumer pattern appears everywhere in software:

Domain Producer Consumer Queue
Web servers Network I/O thread Worker threads Request queue
GUI apps Event loop UI renderer Event queue
Logging Application threads Log writer Log buffer
Databases Query parser Query executor Query queue
Video games Input handler Game loop Input events
Message queues Kafka producers Kafka consumers Topic partitions

Performance impact: A well-implemented concurrent queue is the difference between a system that scales linearly with cores and one that becomes slower as you add CPUs.

2.3 Historical Context

The producer-consumer problem was formalized by Dijkstra in 1965 as one of the classical synchronization problems, alongside the dining philosophers and readers-writers problems.

Evolution of Synchronization in C++

1998   C++98    │ No threading support (use pthreads)
       │        │
2011   C++11  ──┼── std::thread, std::mutex, std::condition_variable
       │        │   First standard threading support!
       │        │
2014   C++14  ──┼── shared_timed_mutex (reader-writer locks)
       │        │
2017   C++17  ──┼── std::scoped_lock (deadlock-free multi-lock)
       │        │   std::shared_mutex
       │        │
2020   C++20  ──┼── std::jthread (auto-joining thread)
       │        │   std::latch, std::barrier, std::semaphore
       │        │   std::atomic_ref, std::atomic<shared_ptr>
       │        │
2023+  Future ──┼── Executors proposal (still in progress)

2.4 Common Misconceptions

Misconception 1: “Volatile makes variables thread-safe” Reality: volatile only prevents compiler optimization, not CPU reordering or atomicity. Use std::atomic for thread safety.

Misconception 2: “I only need a mutex around writes” Reality: Reads also need protection. Without a mutex, a read can see partially-written data.

Misconception 3: “More mutexes = more parallelism” Reality: Fine-grained locking adds overhead and deadlock risk. Start coarse, optimize only when profiling shows contention.

Misconception 4: “Lock-free is always faster” Reality: Lock-free algorithms are harder to implement correctly and can be slower under low contention due to failed CAS loops.

Misconception 5: “notify_one() always wakes exactly one thread” Reality: The specification says it wakes “at least one” thread. The implementation may wake more.


3. Project Specification

3.1 What You Will Build

A production-quality templated thread-safe queue that:

  • Allows multiple producer threads to push items concurrently
  • Allows multiple consumer threads to pop items concurrently
  • Blocks consumers when the queue is empty (no busy-waiting)
  • Supports graceful shutdown that wakes all waiting consumers
  • Optionally supports a maximum capacity with blocking push

3.2 Functional Requirements

ID Requirement Priority
F1 push(T value) adds an item to the queue thread-safely Must
F2 pop() removes and returns an item, blocking if empty Must
F3 try_pop() returns immediately with optional value Should
F4 shutdown() signals all consumers to stop waiting Must
F5 Queue works with any copyable/movable type T Must
F6 Multiple producers can push concurrently Must
F7 Multiple consumers can pop concurrently Must
F8 Bounded capacity with blocking push Could
F9 size() returns approximate current size Could

3.3 Non-Functional Requirements

ID Requirement Metric
NF1 No data races ThreadSanitizer clean
NF2 No deadlocks All threads eventually make progress
NF3 No busy-waiting CPU usage near 0% when idle
NF4 Minimal contention Lock held only during queue operations
NF5 Exception safe Strong guarantee for push

3.4 Example Usage / Output

#include "thread_safe_queue.hpp"
#include <thread>
#include <iostream>

int main() {
    ThreadSafeQueue<int> queue;
    std::atomic<int> sum{0};

    // Producer threads
    auto producer = [&](int start) {
        for (int i = start; i < start + 100; ++i) {
            queue.push(i);
            std::this_thread::sleep_for(std::chrono::microseconds(100));
        }
    };

    // Consumer threads
    auto consumer = [&]() {
        while (true) {
            auto item = queue.pop();
            if (!item.has_value()) break;  // Shutdown signal
            sum += *item;
        }
    };

    std::thread p1(producer, 0);
    std::thread p2(producer, 100);
    std::thread c1(consumer);
    std::thread c2(consumer);

    p1.join();
    p2.join();

    queue.shutdown();

    c1.join();
    c2.join();

    std::cout << "Sum: " << sum << std::endl;
    // Expected: 0+1+2+...+199 = 19900
}

Expected output:

$ ./producer_consumer_demo
Producer 0 pushed: 0, 1, 2, ...
Producer 1 pushed: 100, 101, 102, ...
Consumer 0 popped: 0
Consumer 1 popped: 100
Consumer 0 popped: 1
...
Producer 0 finished
Producer 1 finished
Queue shutting down...
Consumer 0 shutting down
Consumer 1 shutting down
Sum: 19900
All threads joined successfully

3.5 Real World Outcome

When complete, you’ll have a reusable component that can power:

Your ThreadSafeQueue Powers:

┌─────────────┐     ┌─────────────────┐     ┌─────────────┐
│  Network    │     │                 │     │   Worker    │
│  Listener   │────►│  ThreadSafe-    │────►│   Thread    │
│             │     │  Queue<Request> │     │   Pool      │
└─────────────┘     │                 │     └─────────────┘
                    └─────────────────┘

┌─────────────┐     ┌─────────────────┐     ┌─────────────┐
│  App Code   │     │                 │     │  Log File   │
│  (logging)  │────►│  ThreadSafe-    │────►│  Writer     │
│             │     │  Queue<LogMsg>  │     │             │
└─────────────┘     └─────────────────┘     └─────────────┘

┌─────────────┐     ┌─────────────────┐     ┌─────────────┐
│  Event      │     │                 │     │  Event      │
│  Sources    │────►│  ThreadSafe-    │────►│  Handlers   │
│             │     │  Queue<Event>   │     │             │
└─────────────┘     └─────────────────┘     └─────────────┘

4. Solution Architecture

4.1 High-Level Design

Thread-Safe Queue Architecture
══════════════════════════════

                 ┌───────────────────────────────────────────────────┐
                 │              ThreadSafeQueue<T>                   │
                 │                                                   │
   Producers     │  ┌─────────┐   ┌──────────────────────────────┐  │  Consumers
   ═════════     │  │         │   │       Internal Queue         │  │  ═════════
                 │  │  Mutex  │   │                              │  │
  ┌─────────┐    │  │   (m)   │   │  ┌────┬────┬────┬────┬────┐ │  │   ┌─────────┐
  │Thread 1 │─┐  │  │         │   │  │ T1 │ T2 │ T3 │ T4 │ T5 │ │  │ ┌─│Thread A │
  └─────────┘ │  │  └────┬────┘   │  └────┴────┴────┴────┴────┘ │  │ │ └─────────┘
  ┌─────────┐ ├──┼───────┼────────┤     std::queue<T> q         │──┼─┤
  │Thread 2 │─┤  │       │        │                              │  │ │ ┌─────────┐
  └─────────┘ │  │       ▼        └──────────────────────────────┘  │ ├─│Thread B │
  ┌─────────┐ │  │  ┌─────────┐                                     │ │ └─────────┘
  │Thread 3 │─┘  │  │Condition│   ┌──────────────────────────────┐  │ │
  └─────────┘    │  │Variable │   │     Shutdown Flag            │  │ │ ┌─────────┐
                 │  │  (cv)   │   │     bool is_shutdown         │  │ └─│Thread C │
   push(T)       │  └─────────┘   └──────────────────────────────┘  │     └─────────┘
   ─────────►    │       │                     │                    │    ◄─────────
                 │       └─────────────────────┘                    │      pop()
                 │         notify_one/all()                         │
                 └───────────────────────────────────────────────────┘

4.2 Key Components

Component Purpose Implementation
std::queue<T> q Stores the actual data FIFO ordering, not thread-safe
std::mutex m Protects all shared state Guards q, is_shutdown
std::condition_variable cv Consumer notification Wakes sleeping consumers
bool is_shutdown Graceful shutdown flag Checked after wakeup

4.3 Data Structures

template<typename T>
class ThreadSafeQueue {
private:
    // The underlying queue (not thread-safe by itself)
    std::queue<T> q;

    // Mutex to protect ALL shared state
    mutable std::mutex m;

    // Condition variable for consumer notification
    std::condition_variable cv;

    // Shutdown flag
    bool is_shutdown = false;

public:
    void push(T value);
    std::optional<T> pop();
    std::optional<T> try_pop();
    void shutdown();
    bool empty() const;
    size_t size() const;
};

4.4 Algorithm Overview: Wait/Notify Pattern

push() Algorithm:

1. Lock mutex
2. Push item to queue
3. Unlock mutex (via RAII)
4. Notify one waiting consumer

pop() Algorithm:

1. Lock mutex (unique_lock for cv.wait)
2. Wait on CV with predicate: (!queue.empty() || is_shutdown)
   - If predicate false: unlock mutex, sleep
   - On wakeup: relock mutex, recheck predicate
   - If predicate true: continue
3. If shutdown and queue empty: return nullopt
4. Pop item from queue
5. Unlock mutex (via RAII)
6. Return item

shutdown() Algorithm:

1. Lock mutex
2. Set is_shutdown = true
3. Unlock mutex
4. Notify ALL waiting consumers
State Diagram for Consumer Thread

                    ┌────────────────────────┐
                    │       RUNNING          │
                    │   (not holding lock)   │
                    └───────────┬────────────┘
                                │
                                │ call pop()
                                ▼
                    ┌────────────────────────┐
                    │    ACQUIRING LOCK      │
                    │   unique_lock(mutex)   │
                    └───────────┬────────────┘
                                │
                                │ lock acquired
                                ▼
              ┌────────────────────────────────────┐
              │         CHECK PREDICATE            │
              │  (!queue.empty() || is_shutdown)   │
              └────────────────┬───────────────────┘
                               │
         ┌─────────────────────┼─────────────────────┐
         │ false               │                true │
         ▼                     │                     ▼
┌────────────────────┐         │        ┌────────────────────┐
│     WAITING        │         │        │   LOCK HELD        │
│  unlock mutex      │         │        │   check shutdown   │
│  sleep on cv       │         │        └────────┬───────────┘
│  ...               │         │                 │
│  [notified]        │         │    ┌────────────┴────────────┐
│  relock mutex      │─────────┘    │                         │
└────────────────────┘              ▼                         ▼
                           shutdown && empty         data available
                           ┌─────────────┐          ┌─────────────┐
                           │return nullopt│         │ pop & return│
                           └─────────────┘          └─────────────┘

5. Implementation Guide

5.1 Development Environment Setup

# Ubuntu/Debian
sudo apt install g++-11 cmake valgrind

# Verify C++17 support
g++ --version  # Should be 7+ for C++17

# Install ThreadSanitizer (usually included with GCC)
# Verify with:
echo 'int main(){}' | g++ -fsanitize=thread -x c++ -

# macOS
brew install llvm cmake

# Verify
clang++ --version  # Should be 10+ for good C++17

5.2 Project Structure

thread_safe_queue/
├── include/
│   └── thread_safe_queue.hpp    # Header-only implementation
├── src/
│   └── main.cpp                 # Demo/test driver
├── tests/
│   ├── unit_tests.cpp           # Single-threaded correctness tests
│   ├── concurrent_tests.cpp     # Multi-threaded stress tests
│   └── benchmark.cpp            # Performance tests
├── CMakeLists.txt
└── README.md

CMakeLists.txt:

cmake_minimum_required(VERSION 3.14)
project(ThreadSafeQueue)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# Enable ThreadSanitizer for debug builds
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=thread -g")
set(CMAKE_EXE_LINKER_FLAGS_DEBUG "${CMAKE_EXE_LINKER_FLAGS_DEBUG} -fsanitize=thread")

add_executable(demo src/main.cpp)
target_include_directories(demo PRIVATE include)
target_link_libraries(demo pthread)

add_executable(tests tests/concurrent_tests.cpp)
target_include_directories(tests PRIVATE include)
target_link_libraries(tests pthread)

5.3 The Core Question You’re Answering

“How do you safely share a data structure between multiple threads while avoiding both data races and busy-waiting, and how do you cleanly shut down when work is done?”

This question forces you to understand:

  • Why shared mutable state is dangerous
  • How mutexes provide mutual exclusion
  • How condition variables provide efficient waiting
  • Why predicate-based waiting is essential
  • How to signal shutdown without deadlock

5.4 Concepts You Must Understand First

Before coding, ensure you can answer these questions:

Concept Self-Assessment Question Where to Learn
Thread basics How does std::thread differ from a function call? Williams Ch. 2
Mutex semantics What happens if a thread tries to lock an already-locked mutex? Williams Ch. 3.1
RAII locking Why use std::lock_guard instead of m.lock()/m.unlock()? Williams Ch. 3.2
Condition variables Why can’t you use lock_guard with condition variables? Williams Ch. 4.1
Spurious wakeups What is a spurious wakeup and why does it happen? Williams Ch. 4.1.1
Memory ordering What does “happens-before” mean? Williams Ch. 5

5.5 Questions to Guide Your Design

Data Structure Questions:

  1. What underlying container will you use? (std::queue, std::deque, custom?)
  2. Should the queue be bounded (max capacity) or unbounded?
  3. What happens when pop() is called on an empty queue?

Thread Safety Questions:

  1. Which operations need to be atomic?
  2. Where exactly should the mutex be locked and unlocked?
  3. Can multiple readers read concurrently, or does size() also need locking?

Condition Variable Questions:

  1. When should notify_one() vs notify_all() be used?
  2. What is the predicate for the consumer’s wait?
  3. How will you handle spurious wakeups?

Shutdown Questions:

  1. How do waiting consumers know to stop?
  2. Should shutdown() wait for the queue to empty, or return immediately?
  3. What should pop() return after shutdown?

5.6 Thinking Exercise

Before writing any code, trace through this scenario by hand:

Initial State: Queue is empty, 2 consumers waiting

Time    Producer         Queue     Consumer A          Consumer B
────    ────────         ─────     ──────────          ──────────
T1      push(42)         [42]      waiting (cv)        waiting (cv)
T2      notify_one()     [42]      woken, locking      waiting (cv)
T3                       [42]      locked, popping     waiting (cv)
T4                       []        unlocked            waiting (cv)
                                   processing 42

Now answer:

  1. Why didn’t Consumer B wake up at T2?
  2. What would happen if we used notify_all() instead?
  3. What if the producer pushes 2 items before any consumer wakes?

5.7 Hints in Layers

Hint 1: Getting Started

Start with the simplest possible interface:

template<typename T>
class ThreadSafeQueue {
private:
    std::queue<T> q;
    std::mutex m;
    std::condition_variable cv;
    bool done = false;

public:
    void push(T value);
    std::optional<T> pop();
    void shutdown();
};

Focus on getting push() and pop() correct first. The key insight is that push() needs to notify after releasing the lock.

Hint 2: The Pop Implementation

The critical part is the wait predicate:

std::optional<T> pop() {
    std::unique_lock<std::mutex> lock(m);

    // This is the KEY line - handles spurious wakeups
    cv.wait(lock, [this]{ return !q.empty() || done; });

    // After wait: either queue has data OR we're shutting down
    if (done && q.empty()) {
        return std::nullopt;
    }

    T value = std::move(q.front());
    q.pop();
    return value;
}

The lambda predicate is checked:

  1. Before sleeping (if true, don’t sleep at all)
  2. After every wakeup (if false, go back to sleep)
Hint 3: Push and Notify Timing

Should you notify inside or outside the lock?

void push(T value) {
    {
        std::lock_guard<std::mutex> lock(m);
        q.push(std::move(value));
    }  // Lock released HERE

    cv.notify_one();  // Notify AFTER unlocking
}

Why notify after unlocking?

  • If notified thread wakes immediately, it can acquire the lock right away
  • Avoids “hurry up and wait” where notified thread wakes but can’t proceed

Note: Notifying inside the lock is also correct (standard guarantees atomicity), just potentially less efficient.

Hint 4: Shutdown Mechanism
void shutdown() {
    {
        std::lock_guard<std::mutex> lock(m);
        done = true;
    }
    cv.notify_all();  // Wake ALL waiting consumers
}

Why notify_all() and not notify_one()?

  • All consumers need to check done flag
  • With notify_one(), only one consumer wakes, others deadlock

After shutdown:

  • pop() returns nullopt for waiting consumers
  • Future push() calls should be handled (either assert, ignore, or queue normally)

5.8 The Interview Questions They’ll Ask

After completing this project, you should be able to answer:

  1. “Explain the producer-consumer problem and how you’d solve it in C++.”
    • Define the problem (shared buffer, synchronization needs)
    • Explain mutex for mutual exclusion
    • Explain condition variable for efficient waiting
    • Discuss notify_one vs notify_all
  2. “What’s a spurious wakeup and how do you handle it?”
    • Define: waking without notification
    • Cause: OS scheduler optimizations, implementation details
    • Solution: always wait with a predicate in a loop
  3. “What’s the difference between std::lock_guard and std::unique_lock?”
    • lock_guard: simple RAII, lock at construction, unlock at destruction
    • unique_lock: flexible, can unlock/relock, required for condition variables
  4. “How would you implement a bounded queue (with max capacity)?”
    • Add second condition variable for “not full”
    • push() waits on not_full CV when at capacity
    • pop() notifies not_full CV after removing item
  5. “What happens if you forget to notify after pushing?”
    • Consumers may wait forever (deadlock-like)
    • Queue fills up but consumers never wake
  6. “Why use notify_all() for shutdown instead of notify_one()?”
    • All consumers need to see shutdown flag
    • notify_one() would leave N-1 consumers waiting
  7. “How would you add timeout support to pop()?”
    • Use cv.wait_for() or cv.wait_until()
    • Return nullopt on timeout
    • Check predicate after timeout (might have data)
  8. “Is your queue lock-free? Should it be?”
    • No, uses mutex
    • Lock-free harder to implement correctly
    • Lock-based often faster under low contention
    • Profile before optimizing

5.9 Books That Will Help

Topic Book Chapter
Thread basics and std::thread C++ Concurrency in Action Ch. 2: Managing threads
Mutexes and protecting shared data C++ Concurrency in Action Ch. 3: Sharing data between threads
Condition variables C++ Concurrency in Action Ch. 4: Synchronizing concurrent operations
Memory model and atomics C++ Concurrency in Action Ch. 5: The C++ memory model and operations
Lock-free alternatives C++ Concurrency in Action Ch. 7: Designing lock-free data structures
RAII and exception safety Effective C++ Item 13: Use objects to manage resources
Move semantics for efficiency Effective Modern C++ Items 23-25: Move semantics

5.10 Implementation Phases

Phase 1: Basic Queue (Days 1-2)

Goal: Single producer, single consumer, no shutdown

// Start with the absolute minimum
template<typename T>
class ThreadSafeQueue {
    std::queue<T> q;
    std::mutex m;
    std::condition_variable cv;

public:
    void push(T value) {
        std::lock_guard<std::mutex> lock(m);
        q.push(std::move(value));
        cv.notify_one();
    }

    T pop() {  // Blocking pop, returns value directly
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, [this]{ return !q.empty(); });
        T value = std::move(q.front());
        q.pop();
        return value;
    }
};

Checkpoint: Test with one producer pushing 1000 items, one consumer popping 1000 items. Verify all items received.

Phase 2: Multi-Producer Multi-Consumer (Days 3-4)

Goal: Multiple producers and consumers work correctly

Changes:

  • Same code works! (mutex ensures mutual exclusion)
  • Add stress test with N producers, M consumers

Checkpoint: Run ThreadSanitizer, verify no data races:

g++ -std=c++17 -fsanitize=thread -g tests.cpp -lpthread
./a.out  # Should report no errors

Phase 3: Shutdown Support (Days 5-6)

Goal: Clean shutdown without hanging threads

template<typename T>
class ThreadSafeQueue {
    // ... previous members ...
    bool is_shutdown = false;

public:
    void shutdown() {
        {
            std::lock_guard<std::mutex> lock(m);
            is_shutdown = true;
        }
        cv.notify_all();
    }

    std::optional<T> pop() {
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, [this]{ return !q.empty() || is_shutdown; });

        if (is_shutdown && q.empty()) {
            return std::nullopt;
        }

        T value = std::move(q.front());
        q.pop();
        return value;
    }
};

Checkpoint: Producers finish, call shutdown(), all consumers exit cleanly.

Phase 4: try_pop and Additional Features (Days 7-8)

Goal: Non-blocking pop, size query

std::optional<T> try_pop() {
    std::lock_guard<std::mutex> lock(m);
    if (q.empty()) {
        return std::nullopt;
    }
    T value = std::move(q.front());
    q.pop();
    return value;
}

bool empty() const {
    std::lock_guard<std::mutex> lock(m);
    return q.empty();
}

size_t size() const {
    std::lock_guard<std::mutex> lock(m);
    return q.size();
}

Note: size() is inherently racy - by the time you use the result, it may have changed.

Phase 5: Bounded Queue (Days 9-10) - Optional

Goal: Maximum capacity with blocking push

template<typename T>
class BoundedQueue {
    std::queue<T> q;
    std::mutex m;
    std::condition_variable not_empty;  // Consumer waits on this
    std::condition_variable not_full;   // Producer waits on this
    size_t capacity;
    bool is_shutdown = false;

public:
    explicit BoundedQueue(size_t cap) : capacity(cap) {}

    void push(T value) {
        std::unique_lock<std::mutex> lock(m);
        not_full.wait(lock, [this]{ return q.size() < capacity || is_shutdown; });

        if (is_shutdown) return;  // or throw

        q.push(std::move(value));
        not_empty.notify_one();
    }

    std::optional<T> pop() {
        std::unique_lock<std::mutex> lock(m);
        not_empty.wait(lock, [this]{ return !q.empty() || is_shutdown; });

        if (is_shutdown && q.empty()) return std::nullopt;

        T value = std::move(q.front());
        q.pop();
        not_full.notify_one();  // Signal producer that space available
        return value;
    }
};

5.11 Key Implementation Decisions

Decision Options Recommendation Rationale
Return type of pop() T vs optional<T> optional<T> Allows signaling shutdown cleanly
Notification timing Inside vs outside lock Outside (after lock) Slightly more efficient
Underlying container queue vs deque vs list std::queue<T> Simple, sufficient for FIFO
Lock type for pop() lock_guard vs unique_lock unique_lock Required for cv.wait()
Bounded vs unbounded Fixed cap vs unlimited Unbounded first Add bounds later if needed
Exception on shutdown push Throw vs ignore vs queue Document and choose Depends on use case

6. Testing Strategy

Unit Tests (Single-Threaded Correctness)

void test_basic_push_pop() {
    ThreadSafeQueue<int> q;
    q.push(42);
    auto val = q.try_pop();
    assert(val.has_value());
    assert(*val == 42);
    std::cout << "test_basic_push_pop PASSED\n";
}

void test_fifo_order() {
    ThreadSafeQueue<int> q;
    for (int i = 0; i < 100; ++i) {
        q.push(i);
    }
    for (int i = 0; i < 100; ++i) {
        auto val = q.try_pop();
        assert(val.has_value() && *val == i);
    }
    std::cout << "test_fifo_order PASSED\n";
}

void test_empty_try_pop() {
    ThreadSafeQueue<int> q;
    auto val = q.try_pop();
    assert(!val.has_value());
    std::cout << "test_empty_try_pop PASSED\n";
}

void test_shutdown_wakes_consumers() {
    ThreadSafeQueue<int> q;
    std::atomic<bool> woke_up{false};

    std::thread consumer([&]{
        auto val = q.pop();  // Blocks on empty queue
        woke_up = true;
        assert(!val.has_value());  // Should return nullopt
    });

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    assert(!woke_up);  // Consumer should still be waiting

    q.shutdown();
    consumer.join();

    assert(woke_up);
    std::cout << "test_shutdown_wakes_consumers PASSED\n";
}

Concurrent Stress Tests

void stress_test_multi_producer_consumer() {
    constexpr int NUM_PRODUCERS = 4;
    constexpr int NUM_CONSUMERS = 4;
    constexpr int ITEMS_PER_PRODUCER = 10000;

    ThreadSafeQueue<int> q;
    std::atomic<int> total_pushed{0};
    std::atomic<int> total_popped{0};

    auto producer = [&](int id) {
        for (int i = 0; i < ITEMS_PER_PRODUCER; ++i) {
            q.push(i);
            ++total_pushed;
        }
    };

    auto consumer = [&] {
        while (true) {
            auto val = q.pop();
            if (!val.has_value()) break;
            ++total_popped;
        }
    };

    std::vector<std::thread> producers, consumers;

    for (int i = 0; i < NUM_PRODUCERS; ++i) {
        producers.emplace_back(producer, i);
    }
    for (int i = 0; i < NUM_CONSUMERS; ++i) {
        consumers.emplace_back(consumer);
    }

    for (auto& t : producers) t.join();
    q.shutdown();
    for (auto& t : consumers) t.join();

    assert(total_pushed == NUM_PRODUCERS * ITEMS_PER_PRODUCER);
    assert(total_popped == total_pushed);

    std::cout << "stress_test PASSED: " << total_popped << " items\n";
}

ThreadSanitizer Verification

# Compile with ThreadSanitizer
g++ -std=c++17 -fsanitize=thread -g concurrent_tests.cpp -lpthread -o tsan_test

# Run - should report no data races
./tsan_test

# Expected output:
# stress_test PASSED: 40000 items
# (no ThreadSanitizer warnings)

Performance Benchmarks

void benchmark_throughput() {
    ThreadSafeQueue<int> q;
    constexpr int OPS = 1000000;

    auto start = std::chrono::high_resolution_clock::now();

    std::thread producer([&] {
        for (int i = 0; i < OPS; ++i) {
            q.push(i);
        }
    });

    std::thread consumer([&] {
        for (int i = 0; i < OPS; ++i) {
            q.pop();
        }
    });

    producer.join();
    consumer.join();

    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

    double ops_per_sec = (OPS * 2.0) / (duration.count() / 1000.0);
    std::cout << "Throughput: " << ops_per_sec << " ops/sec\n";
}

7. Common Pitfalls & Debugging

Pitfall Symptom Fix
Forgetting to notify Consumers hang forever Always notify_one() after push()
Using lock_guard with CV Compile error Use unique_lock for cv.wait()
Notifying without lock Potential lost wakeup Notify is safe without lock, but set state with lock
Not using predicate wait Random hangs (spurious wakeups) Always use cv.wait(lock, predicate)
notify_one for shutdown Only one consumer exits Use notify_all() for shutdown
Data race on shutdown flag TSan error, undefined behavior Always read/write under mutex
Locking in wrong order Deadlock Consistent lock ordering
Forgetting move semantics Unnecessary copies Use std::move in push/pop

Debugging Race Conditions

Step 1: Enable ThreadSanitizer

g++ -fsanitize=thread -g your_code.cpp -lpthread

Step 2: Interpret TSan output

WARNING: ThreadSanitizer: data race (pid=12345)
  Write of size 4 at 0x... by thread T1:
    #0 ThreadSafeQueue::push() at queue.hpp:42
  Previous read of size 4 at 0x... by thread T2:
    #0 ThreadSafeQueue::pop() at queue.hpp:58

This tells you exactly which lines have the race.

Step 3: Common TSan warnings and fixes

TSan Warning Likely Cause Fix
Data race on bool Shutdown flag accessed without lock Add lock around all flag accesses
Data race on queue Queue accessed without lock Verify lock scope covers all operations
Lock-order-inversion Two threads lock A,B in different orders Establish consistent lock ordering

Performance Debugging

High lock contention symptoms:

  • CPU usage high but throughput low
  • Many threads in “waiting for lock” state

Diagnosis:

// Add timing instrumentation
auto start = std::chrono::steady_clock::now();
std::lock_guard<std::mutex> lock(m);
auto acquired = std::chrono::steady_clock::now();
auto wait_time = acquired - start;
// Log wait_time if > threshold

Solutions:

  1. Reduce critical section size
  2. Use try_lock with fallback
  3. Consider lock-free queue if profiling shows contention

8. Extensions & Challenges

Beginner Extensions

  • Add timeout to pop(): Use cv.wait_for() with duration
  • Add statistics: Track total items pushed/popped
  • Add peek(): Look at front without removing
  • Thread-safe iteration: Snapshot the queue for debugging

Intermediate Extensions

  • Bounded queue: Max capacity with blocking push
  • Priority queue: Items popped by priority, not FIFO
  • Multiple queues: Route to queues based on item type
  • Wait for empty: Block until all items consumed

Advanced Extensions

  • Lock-free implementation: Use atomics instead of mutex
  • Work-stealing: Allow idle consumers to steal from other queues
  • Batch operations: push_many() / pop_many() for efficiency
  • NUMA-aware: Separate queues per NUMA node

Research Extensions

  • Formal verification: Prove correctness using model checker
  • Benchmark comparison: Compare with boost::lockfree, folly::MPMCQueue
  • Adaptive spinning: Spin briefly before sleeping on CV

9. Real-World Connections

Industry Applications

System How It Uses Producer-Consumer
Apache Kafka Topics are queues, producers push messages, consumer groups pop
Redis LIST data type with BLPOP/BRPOP for blocking queues
RabbitMQ Message broker built on queue semantics
Go channels Language-level bounded producer-consumer queues
Java BlockingQueue java.util.concurrent provides several implementations
  • folly::MPMCQueue: Facebook’s multi-producer multi-consumer queue (lock-free)
  • boost::lockfree::queue: Boost’s lock-free queue implementation
  • moodycamel::ConcurrentQueue: Popular single-header lock-free queue
  • tbb::concurrent_queue: Intel Threading Building Blocks

Interview Relevance

This project prepares you for questions at companies like:

Company Relevant Domain
Meta Async event processing in React Native
Google RPC systems, MapReduce task queues
Amazon SQS, internal message passing
Microsoft Windows thread pool, async I/O
Trading firms Order book updates, market data

10. Resources

Essential Reading

Resource What You’ll Learn
“C++ Concurrency in Action” Ch. 3-4 Deep dive on mutexes, condition variables
cppreference.com condition_variable API details and edge cases
Herb Sutter’s “Effective Concurrency” series Design patterns and pitfalls

Video Resources

  • CppCon 2014: “Lock-Free Programming” by Herb Sutter
  • CppCon 2017: “C++ atomics: from basic to advanced” by Fedor Pikus
  • MIT 6.172: Performance Engineering lectures on concurrency

Tools

  • ThreadSanitizer: Data race detector (built into GCC/Clang)
  • Helgrind: Valgrind tool for concurrency bugs
  • rr: Record and replay debugger (great for race conditions)
  • Intel Inspector: Commercial tool for threading analysis

Reference Implementations

  • C++ Standard: Look at <mutex>, <condition_variable> headers
  • libstdc++: GCC’s implementation in bits/std_mutex.h
  • libc++: Clang’s implementation in __threading_support

11. Self-Assessment Checklist

Before considering this project complete, verify:

Understanding

  • I can explain why both mutex and condition variable are needed
  • I can describe what happens during cv.wait() step by step
  • I can explain spurious wakeups and the predicate solution
  • I understand why unique_lock is required for condition variables
  • I can explain the difference between notify_one and notify_all

Implementation

  • Push and pop work correctly with single producer/consumer
  • Multiple producers and consumers work without data corruption
  • Shutdown wakes all waiting consumers
  • ThreadSanitizer reports no data races
  • CPU usage is near 0% when queue is empty (no busy-waiting)

Quality

  • Code compiles with -Wall -Wextra without warnings
  • Exception safety: push provides strong guarantee
  • Move semantics used where appropriate
  • Header-only implementation works across translation units

Verification

  • All unit tests pass
  • Stress tests pass consistently
  • Tested with ThreadSanitizer
  • Tested shutdown under various conditions

12. Submission / Completion Criteria

Minimum Viable Completion

  • push() and pop() work correctly
  • Works with multiple producers and consumers
  • No data races (ThreadSanitizer clean)
  • Basic test coverage

Full Completion

  • All above plus:
  • try_pop() implemented
  • shutdown() wakes all consumers
  • empty() and size() work
  • Comprehensive test coverage
  • Clean code with comments

Excellence (Going Above & Beyond)

  • Bounded queue variant
  • Timeout support for pop
  • Performance benchmarks
  • Comparison with std::mutex alternatives (shared_mutex)
  • Lock-free implementation attempt

Next Project: P03-compile-time-units.md - Template Metaprogramming

This guide was expanded from LEARN_ADVANCED_CPP_DEEP_DIVE.md. For the complete learning path, see the README.