Project 1: Multi-Threaded Log Aggregator
Build a system where multiple producer threads generate log messages while a single consumer thread aggregates and writes them to disk.
Quick Reference
| Attribute | Value |
|---|---|
| Difficulty | Beginner |
| Time Estimate | Weekend (8-16 hours) |
| Language | C++ (C++17 or later) |
| Alternatives | Rust, Go, Java |
| Knowledge Area | Threading Fundamentals / Synchronization |
| Main Book | “C++ Concurrency in Action, 2nd Ed” - Williams |
| Portfolio Value | Resume Gold |
Learning Objectives
By completing this project, you will:
- Launch and manage multiple threads using
std::threadand understand the thread lifecycle - Protect shared data from race conditions using
std::mutexand lock guards - Coordinate thread communication with
std::condition_variableto eliminate wasteful polling - Build a thread-safe queue that becomes a reusable building block for concurrent systems
- Implement graceful shutdown using
std::atomic<bool>for inter-thread signaling - Debug threading issues using sanitizers and understand common pitfalls
- Measure and reason about throughput in concurrent systems
Theoretical Foundation
Core Concepts
1. Threads: Independent Execution Paths
A thread is an independent path of execution within a process. All threads share the same memory space (heap, global variables), but each has its own stack and registers.
Process Memory Space
+------------------------------------------+
| |
| Shared Heap |
| (all threads can access) |
| |
+------------------------------------------+
| Shared Global Variables |
| (this is where problems start!) |
+------------------------------------------+
| |
| Thread 1 Thread 2 Thread 3 |
| +------+ +------+ +------+ |
| | Stack| | Stack| | Stack| |
| | | | | | | |
| +------+ +------+ +------+ |
| |
+------------------------------------------+
Key insight: When multiple threads access the same memory location, and at least one is writing, you have a potential data race. Data races are undefined behavior in C++.
2. Mutex: Mutual Exclusion
A mutex (mutual exclusion) ensures only one thread can execute a critical section at a time:
Thread 1 Thread 2
| |
| [try to lock] |
| | |
v v |
+-------------------+ |
| CRITICAL SECTION | | [waiting for lock]
| (only one thread) | | .
+-------------------+ | .
| | .
| [unlock] | .
| v
| +-------------------+
| | CRITICAL SECTION |
| | (now Thread 2) |
| +-------------------+
In C++, never use raw lock() and unlock(). Use RAII lock guards:
std::mutex mtx;
std::queue<LogEntry> shared_queue;
void producer() {
LogEntry entry = generate_log();
{
std::lock_guard<std::mutex> lock(mtx); // Acquires lock
shared_queue.push(entry); // Protected access
} // Lock released automatically when 'lock' goes out of scope
}
3. Condition Variables: Efficient Waiting
Without condition variables, the consumer must poll (busy-wait):
// BAD: Wasteful polling
while (running) {
std::lock_guard<std::mutex> lock(mtx);
if (!queue.empty()) {
process(queue.front());
queue.pop();
}
// If queue is empty, we just checked for nothing!
}
A condition variable lets a thread sleep until a condition becomes true:
Producer Condition Variable Consumer
| | |
| push(item) | | [sleeping]
| notify_one() --------------->| | .
| |--- wake up! --------------->|
| | | pop(item)
std::mutex mtx;
std::queue<LogEntry> queue;
std::condition_variable cv;
void producer() {
LogEntry entry = generate_log();
{
std::lock_guard<std::mutex> lock(mtx);
queue.push(entry);
}
cv.notify_one(); // Wake up the consumer
}
void consumer() {
while (running) {
std::unique_lock<std::mutex> lock(mtx);
// Wait until queue is not empty
cv.wait(lock, [&]{ return !queue.empty() || !running; });
if (!queue.empty()) {
LogEntry entry = queue.front();
queue.pop();
lock.unlock(); // Release lock before I/O
write_to_disk(entry);
}
}
}
Critical: The predicate [&]{ return !queue.empty() || !running; } handles spurious wakeups–the OS may wake your thread even when nothing called notify.
4. Atomic Operations: Lock-Free Signaling
For simple flags, std::atomic provides thread-safe access without locks:
std::atomic<bool> shutdown_requested{false};
// In main thread:
shutdown_requested.store(true);
// In worker threads:
while (!shutdown_requested.load()) {
// do work
}
Why This Matters
Modern CPUs are parallel: A 2024 laptop has 8-16 cores. Single-threaded code uses only one core. To utilize modern hardware, you must understand concurrent programming.
Concurrency is hard: Race conditions, deadlocks, and livelocks are subtle bugs that may only appear under specific timing conditions. They are notoriously difficult to reproduce and debug. This project gives you controlled exposure to these challenges.
The Producer-Consumer pattern is everywhere:
- Web servers: Accept thread (producer) -> Worker pool (consumers)
- Message queues: Publishers (producers) -> Subscribers (consumers)
- Logging systems: Application threads (producers) -> Log writer (consumer)
- Video games: Game logic (producer) -> Render thread (consumer)
Historical Context
The concepts you’ll use have deep roots:
- Mutual Exclusion (Mutex): First formalized by Dijkstra in 1965 with his semaphore concept
- Producer-Consumer Problem: Classic synchronization problem from operating systems research (1960s)
- Condition Variables: Introduced in Monitors (Hoare, 1974)
- C++ Threading: Added in C++11 (2011), previously required platform-specific APIs (pthreads, Windows threads)
Common Misconceptions
Misconception 1: “More threads = more performance” Reality: Too many threads cause excessive context switching. Beyond a certain point, adding threads hurts performance.
Misconception 2: “std::mutex is slow, avoid it” Reality: Uncontended mutex acquisition is ~25 nanoseconds. The problem is contention (threads waiting for each other), not the mutex itself.
Misconception 3: “I only read the variable, so I don’t need synchronization” Reality: If another thread writes to it, you need synchronization. Reads of partially-written values cause undefined behavior.
Misconception 4: “My tests pass, so there are no race conditions” Reality: Race conditions are timing-dependent. They may not manifest in your test environment but could cause crashes in production.
Project Specification
What You Will Build
A multi-threaded log aggregation system consisting of:
- N producer threads that generate log messages at high speed
- 1 consumer thread that aggregates messages and writes them to disk in batches
- A thread-safe queue connecting producers to the consumer
- Graceful shutdown ensuring no messages are lost
Functional Requirements
- Producer Threads:
- Each producer generates log entries with timestamps, log levels, thread ID, and message content
- Producers push entries to a shared queue
- Configurable number of producers (default: 3)
- Configurable entries per producer (default: 5000)
- Consumer Thread (Aggregator):
- Waits for entries using condition variables (no busy-waiting)
- Aggregates entries into batches (configurable batch size, default: 1000)
- Writes batches to disk with proper formatting
- Handles both batch-size and timeout-based flushing
- Thread-Safe Queue:
- Supports concurrent push from multiple producers
- Supports pop/wait from consumer
- Provides size/empty queries safely
- Supports shutdown signaling
- Shutdown Handling:
- Clean shutdown on SIGINT (Ctrl+C) or programmatic signal
- Drain remaining queue entries before exit
- Report final statistics (entries processed, dropped, latency)
- Statistics and Monitoring:
- Count of entries generated per producer
- Count of entries written
- Average latency from generation to write
- Any dropped entries (if queue reaches maximum size)
Non-Functional Requirements
- No data races: Must pass ThreadSanitizer with zero warnings
- No deadlocks: System must not hang under any circumstances
- Bounded memory: Queue has maximum size (default: 10000 entries)
- Graceful degradation: Under overload, drop with warning rather than crash
- Cross-platform: Compile on Linux, macOS, Windows (with standard C++)
Example Usage and Output
$ ./log_aggregator --producers 3 --entries 5000 --batch-size 1000
[2024-01-15T10:30:15.001Z] [INFO] Starting log aggregator...
[2024-01-15T10:30:15.002Z] [INFO] Configuration: 3 producers, 5000 entries each
[2024-01-15T10:30:15.002Z] [INFO] [Producer-1] Starting...
[2024-01-15T10:30:15.002Z] [INFO] [Producer-2] Starting...
[2024-01-15T10:30:15.003Z] [INFO] [Producer-3] Starting...
[2024-01-15T10:30:15.003Z] [INFO] [Aggregator] Starting...
[2024-01-15T10:30:15.523Z] [INFO] [Aggregator] Wrote batch #1 (1000 entries) to logs/2024-01-15_001.log
[2024-01-15T10:30:16.041Z] [INFO] [Aggregator] Wrote batch #2 (1000 entries) to logs/2024-01-15_002.log
...
[2024-01-15T10:30:18.892Z] [INFO] [Producer-1] Completed: 5000 entries generated
[2024-01-15T10:30:18.923Z] [INFO] [Producer-2] Completed: 5000 entries generated
[2024-01-15T10:30:18.945Z] [INFO] [Producer-3] Completed: 5000 entries generated
[2024-01-15T10:30:19.102Z] [INFO] [Aggregator] Final batch written (892 entries)
[2024-01-15T10:30:19.103Z] [INFO] [Aggregator] Shutting down...
================================================================================
LOG AGGREGATOR STATISTICS
================================================================================
Total entries generated: 15000
Total entries written: 15000
Entries dropped: 0
Average latency: 0.34ms
Peak queue depth: 2847
Total runtime: 4.101s
Throughput: 3657 entries/second
================================================================================
$ ls logs/
2024-01-15_001.log 2024-01-15_002.log ... 2024-01-15_015.log
$ head logs/2024-01-15_001.log
2024-01-15T10:30:15.123456Z [INFO ] [P01] User session started: user_42 (session_id=a1b2c3)
2024-01-15T10:30:15.123589Z [WARN ] [P02] Memory usage above threshold: 85%
2024-01-15T10:30:15.123612Z [DEBUG] [P03] Cache miss for key: product_catalog_v2
2024-01-15T10:30:15.123701Z [INFO ] [P01] Request processed: GET /api/users (latency=12ms)
...
Real World Outcome
When you complete this project, you will have:
- A working concurrent system that you can show in interviews and portfolio
- A reusable thread-safe queue class for future projects
- Experience debugging race conditions with sanitizers
- Understanding of producer-consumer that applies to countless real systems
- Confidence discussing threading in technical interviews
Solution Architecture
High-Level Design
+------------------+
| Main Thread |
| (orchestrator) |
+--------+---------+
|
+----------------+----------------+
| | |
v v v
+---------------+ +---------------+ +---------------+
| Producer 1 | | Producer 2 | | Producer 3 |
| (std::thread) | | (std::thread) | | (std::thread) |
+-------+-------+ +-------+-------+ +-------+-------+
| | |
| push() | push() | push()
v v v
+-----------------------------------------------+
| |
| Thread-Safe Queue |
| +----------------------------------------+ |
| | std::queue<LogEntry> + std::mutex | |
| | + std::condition_variable | |
| +----------------------------------------+ |
| |
+------------------------+----------------------+
|
| pop() / wait()
v
+------------------+
| Consumer |
| (Aggregator) |
| (std::thread) |
+--------+---------+
|
| batch write
v
+------------------+
| Log Files |
| (filesystem) |
+------------------+
Key Components
| Component | Responsibility | Key Classes/Functions |
|---|---|---|
| ThreadSafeQueue | Concurrent queue with blocking pop | push(), pop(), wait_and_pop(), shutdown() |
| LogEntry | Data structure for log messages | timestamp, level, thread_id, message |
| Producer | Generates log entries | operator()(), configurable rate/count |
| Aggregator | Consumes and writes entries | operator()(), batch management |
| Statistics | Tracks metrics thread-safely | atomic counters, latency tracking |
| Main | Orchestrates threads, handles signals | thread creation/joining, signal handling |
Data Structures
// Log entry - the data flowing through the system
struct LogEntry {
std::chrono::system_clock::time_point timestamp;
LogLevel level; // enum: DEBUG, INFO, WARN, ERROR
std::string thread_id;
std::string message;
std::string format() const;
};
// Thread-safe queue - the core synchronization primitive
template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cv_;
std::atomic<bool> shutdown_{false};
size_t max_size_;
public:
bool push(T item); // Returns false if full
std::optional<T> pop(); // Non-blocking
std::optional<T> wait_pop(); // Blocking with timeout
void shutdown();
bool is_shutdown() const;
size_t size() const;
};
// Statistics collector - thread-safe metrics
struct Statistics {
std::atomic<uint64_t> entries_generated{0};
std::atomic<uint64_t> entries_written{0};
std::atomic<uint64_t> entries_dropped{0};
std::atomic<uint64_t> total_latency_us{0};
std::atomic<uint64_t> peak_queue_depth{0};
void report() const;
};
Algorithm Overview
Producer Algorithm:
for i in 0..entries_count:
entry = generate_log_entry()
if queue.push(entry):
stats.entries_generated++
else:
stats.entries_dropped++
notify main thread of completion
Consumer (Aggregator) Algorithm:
while not shutdown or queue not empty:
batch = []
start_time = now()
while batch.size < batch_size and elapsed < timeout:
entry = queue.wait_pop(remaining_timeout)
if entry:
batch.append(entry)
if batch not empty:
write_batch_to_file(batch)
stats.entries_written += batch.size
Shutdown Algorithm:
1. Signal all producers to stop (via atomic flag or completion count)
2. Wait for all producers to join
3. Signal queue shutdown (unblocks consumer's wait_pop)
4. Consumer drains remaining entries
5. Consumer joins
6. Report final statistics
Implementation Guide
Development Environment Setup
# Required: C++ compiler with C++17 support
# Linux
sudo apt-get install g++ cmake
# macOS
xcode-select --install
# Verify C++17 support
g++ --version # Should be GCC 7+ or Clang 5+
# Create project structure
mkdir -p log_aggregator/{src,include,tests,logs}
cd log_aggregator
Create a basic CMakeLists.txt:
cmake_minimum_required(VERSION 3.14)
project(log_aggregator)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Enable threading
find_package(Threads REQUIRED)
# Enable sanitizers for debug builds
if(CMAKE_BUILD_TYPE MATCHES Debug)
add_compile_options(-fsanitize=thread -g)
add_link_options(-fsanitize=thread)
endif()
add_executable(log_aggregator
src/main.cpp
src/thread_safe_queue.cpp
src/producer.cpp
src/aggregator.cpp
src/statistics.cpp
)
target_include_directories(log_aggregator PRIVATE include)
target_link_libraries(log_aggregator Threads::Threads)
Project Structure
log_aggregator/
├── CMakeLists.txt
├── include/
│ ├── log_entry.hpp # LogEntry struct and LogLevel enum
│ ├── thread_safe_queue.hpp # Template class for concurrent queue
│ ├── producer.hpp # Producer class
│ ├── aggregator.hpp # Consumer/Aggregator class
│ └── statistics.hpp # Thread-safe statistics
├── src/
│ ├── main.cpp # Entry point, orchestration
│ ├── log_entry.cpp # LogEntry implementation
│ ├── producer.cpp # Producer implementation
│ ├── aggregator.cpp # Aggregator implementation
│ └── statistics.cpp # Statistics implementation
├── tests/
│ ├── test_queue.cpp # Unit tests for ThreadSafeQueue
│ └── test_integration.cpp # Integration tests
└── logs/ # Output directory
The Core Question You’re Answering
“How do I safely share data between threads that produce and consume at different rates, without losing data or causing crashes?”
This question is fundamental to concurrent programming. You’ll learn that the answer involves:
- Mutual exclusion (mutex) to prevent simultaneous access
- Signaling (condition variables) for efficient coordination
- Bounded buffers to handle rate mismatches
- Graceful degradation when the buffer is full
Concepts You Must Understand First
Before starting, ensure you can answer these questions:
| Concept | Self-Assessment Question | Where to Learn |
|---|---|---|
| RAII | What happens to a std::lock_guard when it goes out of scope? |
“C++ Concurrency in Action” Ch. 3.2.1 |
| Move semantics | Why might you want to move a std::thread instead of copying it? |
Any modern C++ book, Ch. on move |
| Lambda expressions | What does [&] capture? |
C++ reference, lambda expressions |
| Template basics | What is a template parameter? | C++ templates introduction |
| std::optional | How do you check if an optional has a value? | C++17 features |
| std::chrono | How do you get the current time? | C++ chrono library |
Questions to Guide Your Design
Work through these questions before writing code:
Thread Lifecycle:
- How many threads will your program have in total?
- Who creates each thread? Who joins it?
- What happens if a thread throws an exception?
Data Flow:
- What data structure holds log entries?
- How do you prevent two producers from pushing simultaneously?
- How does the consumer know when new entries are available?
Shutdown:
- How do you signal that the program should stop?
- How do you ensure all queued entries are written before exit?
- What if a producer is blocked on a full queue during shutdown?
Error Handling:
- What happens if the queue is full?
- What happens if disk write fails?
- What happens if memory allocation fails?
Thinking Exercise
Before writing any code, trace through this scenario by hand:
Scenario: 2 producers, each generating 5 entries, 1 consumer, batch size 3.
Draw a timeline showing:
- When each producer acquires the mutex
- When entries are added to the queue
- When the consumer wakes up
- When batches are written
- When threads complete
Time Producer 1 Producer 2 Queue Consumer
---- ---------- ---------- ----- --------
T0 [start] [start] [] [waiting]
T1 [lock,push] [blocked] [E1] [wakeup]
T2 [push] [lock,push] [E2,E3] [blocked]
...
Questions to answer:
- How many times does the consumer write to disk?
- What’s the maximum queue depth reached?
- Which thread completes first?
- How long does the consumer wait at the end?
Hints in Layers
If you’re stuck, reveal hints one at a time:
Hint 1: Starting Point (Conceptual Direction)
Start with the simplest possible version that works:
- Create a single producer, single consumer
- Use a plain
std::queueprotected by a singlestd::mutex - Have the consumer poll the queue in a loop (we’ll improve this later)
- Use
std::atomic<bool>for shutdown signaling
Don’t worry about efficiency yet. Get the basic mechanics working first.
std::mutex mtx;
std::queue<LogEntry> queue;
std::atomic<bool> running{true};
void producer() {
for (int i = 0; i < 100; ++i) {
std::lock_guard<std::mutex> lock(mtx);
queue.push(make_entry(i));
}
}
void consumer() {
while (running || !queue.empty()) {
std::lock_guard<std::mutex> lock(mtx);
if (!queue.empty()) {
auto entry = queue.front();
queue.pop();
write(entry);
}
}
}
Hint 2: Next Level (More Specific Guidance)
Once basic polling works, add condition variables:
- The consumer should call
cv.wait()instead of polling - Producers should call
cv.notify_one()after pushing - Use a predicate in
wait()to handle spurious wakeups
Key pattern:
// Consumer
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&]{ return !queue.empty() || shutdown; });
// The predicate is checked while holding the lock
// If false, the lock is released and thread sleeps
// On wakeup, lock is reacquired and predicate checked again
For shutdown:
void shutdown() {
{
std::lock_guard<std::mutex> lock(mtx);
shutdown_flag = true;
}
cv.notify_all(); // Wake consumer to check shutdown
}
Hint 3: Technical Details (Approach and Patterns)
Encapsulate the thread-safe queue as a reusable class:
template<typename T>
class ThreadSafeQueue {
public:
void push(T value) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(value));
}
cv_.notify_one();
}
std::optional<T> wait_and_pop(std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mutex_);
if (cv_.wait_for(lock, timeout, [this]{
return !queue_.empty() || shutdown_;
})) {
if (!queue_.empty()) {
T value = std::move(queue_.front());
queue_.pop();
return value;
}
}
return std::nullopt;
}
void shutdown() {
{
std::lock_guard<std::mutex> lock(mutex_);
shutdown_ = true;
}
cv_.notify_all();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cv_;
bool shutdown_ = false;
};
For bounded queue, check size before push and return success/failure.
Hint 4: Tools and Debugging (Verification Methods)
Enable ThreadSanitizer to catch race conditions:
cmake -DCMAKE_BUILD_TYPE=Debug ..
make
./log_aggregator
# Any races will be reported with stack traces
Common TSan errors and fixes:
- “data race on location X”: You forgot to lock before accessing X
- “lock-order-inversion”: Potential deadlock detected
Debug with logging (temporarily):
std::mutex cout_mutex;
#define LOG(msg) { std::lock_guard<std::mutex> l(cout_mutex); \
std::cout << "[" << std::this_thread::get_id() << "] " << msg << "\n"; }
Test edge cases:
- Single producer, single consumer
- Many producers, single consumer (contention)
- Empty queue shutdown
- Full queue (test backpressure)
- Rapid shutdown (before producers complete)
The Interview Questions They’ll Ask
After completing this project, you’ll be ready for these common interview questions:
- “Explain the difference between a data race and a race condition.”
- Data race: Two threads access same memory, at least one writes, no synchronization. Undefined behavior in C++.
- Race condition: Logic error where correctness depends on timing. Can exist even with proper synchronization.
- Example: Two threads incrementing a counter without locks is a data race. Two threads each checking if a seat is available then booking it is a race condition.
- “When would you use a mutex vs an atomic variable?”
- Atomic: Single variable, simple operations (load, store, increment). Lock-free.
- Mutex: Multiple variables that must be updated together, complex operations. May block.
- Example: A single counter can be atomic. A queue (head pointer + tail pointer + data) needs mutex.
- “What is a condition variable and why is it better than polling?”
- Condition variable lets a thread sleep until a condition becomes true.
- Polling wastes CPU cycles checking repeatedly.
- CPU usage: polling = 100%, condition variable = 0% while waiting.
- Must use with mutex; must handle spurious wakeups with predicate.
- “How would you implement a thread-safe queue?”
std::queue+std::mutex+std::condition_variable- Lock before push/pop, notify after push, wait before pop
- Discuss: bounded vs unbounded, blocking vs non-blocking pop
- “What are spurious wakeups and how do you handle them?”
- OS may wake thread even without
notify_one/allbeing called - Always use
wait()with a predicate lambda that checks the actual condition - Loop: wake -> lock -> check predicate -> if false, sleep again
- OS may wake thread even without
- “Describe the producer-consumer pattern and its common issues.”
- Pattern: producers generate work, put in buffer, consumers take and process
- Issues: buffer full (producers block or drop), buffer empty (consumers wait), shutdown coordination
- Solutions: bounded buffer, condition variables, graceful shutdown protocol
- “How would you debug a deadlock?”
- Use
-fsanitize=threadto detect potential deadlocks - Use
gdbto attach to hung process,thread apply all btto see all stack traces - Common cause: acquiring locks in different order across threads
- Prevention: always acquire locks in same order, use
std::scoped_lockfor multiple locks
- Use
Books That Will Help
| Topic | Book | Chapter |
|---|---|---|
| std::thread basics | C++ Concurrency in Action, 2nd Ed | Chapter 2: Managing threads |
| Mutex and Lock Guards | C++ Concurrency in Action, 2nd Ed | Chapter 3: Sharing data between threads |
| Condition variables | C++ Concurrency in Action, 2nd Ed | Chapter 4.1: Waiting for events |
| Thread-safe queue design | C++ Concurrency in Action, 2nd Ed | Chapter 6.2: Lock-based concurrent data structures |
| Atomics introduction | C++ Concurrency in Action, 2nd Ed | Chapter 5.1: The standard atomic types |
| General concurrency theory | The Art of Multiprocessor Programming | Chapters 1-3 |
Implementation Phases
Phase 1: Basic Thread Management (Day 1, ~2-3 hours)
Goals:
- Launch multiple threads
- Understand thread lifecycle
- Get basic synchronization working
Tasks:
- Create a simple program with 3 producer threads that print their thread ID
- Add a shared counter, protected by mutex, that each producer increments
- Join all threads and print final counter value
- Verify with ThreadSanitizer
Checkpoint: Counter equals expected value, TSan reports no errors.
// Minimal starting point
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
int main() {
std::mutex mtx;
int counter = 0;
auto worker = [&](int id) {
for (int i = 0; i < 1000; ++i) {
std::lock_guard<std::mutex> lock(mtx);
++counter;
}
std::cout << "Worker " << id << " done\n";
};
std::vector<std::thread> threads;
for (int i = 0; i < 3; ++i) {
threads.emplace_back(worker, i);
}
for (auto& t : threads) {
t.join();
}
std::cout << "Counter: " << counter << " (expected: 3000)\n";
}
Phase 2: Thread-Safe Queue (Day 1, ~3-4 hours)
Goals:
- Implement basic ThreadSafeQueue with mutex protection
- Add blocking wait with condition variable
- Test with single producer, single consumer
Tasks:
- Create
ThreadSafeQueue<T>class withpush()andtry_pop() - Add
wait_and_pop()using condition variable - Add
shutdown()method to unblock waiting threads - Write unit test that pushes 1000 items and pops them
Checkpoint: Queue correctly handles concurrent access, no items lost.
Phase 3: Producer and Consumer (Day 2, ~3-4 hours)
Goals:
- Create Producer class that generates log entries
- Create Aggregator class that consumes and batches
- Wire them together
Tasks:
- Define
LogEntrystruct with timestamp, level, thread_id, message - Implement
Producer::operator()()that generates N entries - Implement
Aggregator::operator()()that consumes and writes batches - Test with 1 producer, 1 consumer, verify all entries written
Checkpoint: Log files contain correct number of properly formatted entries.
Phase 4: Multiple Producers (Day 2, ~2-3 hours)
Goals:
- Scale to multiple producers
- Handle contention
- Measure throughput
Tasks:
- Create vector of producer threads
- Ensure thread-safe statistics collection
- Add timing measurements
- Report throughput (entries/second)
Checkpoint: System handles 3+ producers without data loss or corruption.
Phase 5: Graceful Shutdown (Day 2, ~2 hours)
Goals:
- Clean shutdown on signal or completion
- Drain queue before exit
- Report final statistics
Tasks:
- Add signal handler for SIGINT (Ctrl+C)
- Implement shutdown sequence: stop producers -> drain queue -> join consumer
- Collect and report final statistics
Checkpoint: Ctrl+C triggers clean shutdown with all queued entries written.
Phase 6: Polish and Edge Cases (Day 2, ~2-3 hours)
Goals:
- Handle edge cases
- Optimize if needed
- Final testing
Tasks:
- Test empty queue shutdown
- Test full queue backpressure
- Add command-line argument parsing
- Run ThreadSanitizer one final time
- Write documentation
Checkpoint: System is robust, documented, and ready for portfolio.
Key Implementation Decisions
| Decision | Options | Recommendation | Rationale |
|---|---|---|---|
| Queue bound | Unbounded vs Bounded | Bounded (10000) | Prevents memory exhaustion under load |
| When queue full | Block vs Drop | Drop with warning | Prevents deadlock, maintains responsiveness |
| Consumer pop | Blocking vs Polling | Blocking with timeout | Efficient, allows periodic batch flush |
| Batch trigger | Size only vs Size+Timeout | Both (1000 or 100ms) | Handles both high and low load |
| Shutdown signal | Atomic flag vs Poison pill | Atomic flag + CV notify | Cleaner, no special queue values |
| Log format | Binary vs Text | Text (JSON or CSV) | Human-readable, easy debugging |
Testing Strategy
Test Categories
| Category | Purpose | Examples |
|---|---|---|
| Unit Tests | Test individual components | ThreadSafeQueue push/pop correctness |
| Integration Tests | Test component interaction | Producer -> Queue -> Consumer flow |
| Stress Tests | Find issues under load | 100 producers, 1M entries |
| Race Detection | Find synchronization bugs | Run with ThreadSanitizer |
| Edge Cases | Test boundary conditions | Empty queue, full queue, rapid shutdown |
Critical Test Cases
- Single Producer, Single Consumer:
- Push 1000 entries, verify all consumed
- Order preserved (FIFO)
- Multiple Producers, Single Consumer:
- 3 producers, 5000 entries each
- Total entries written = 15000
- No duplicates, no missing
- Full Queue Backpressure:
- Queue size = 100, produce 1000 rapidly
- Verify graceful handling (drop or block)
- Shutdown During Production:
- Start producing, send shutdown after 50%
- All queued entries should be written
- Empty Queue Shutdown:
- Produce nothing, send shutdown
- Consumer exits cleanly
- Rapid Start/Stop:
- Start and immediately shutdown
- No crashes, no hangs
Testing with ThreadSanitizer
# Build with TSan
mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Debug ..
make
# Run and watch for race reports
./log_aggregator --producers 3 --entries 1000
# Expected output on success:
# (normal program output, no TSan warnings)
# Example TSan error (if there's a race):
# WARNING: ThreadSanitizer: data race
# Write of size 4 at 0x7f... by thread T1:
# #0 Producer::run() producer.cpp:42
# Previous read of size 4 at 0x7f... by thread T2:
# #0 Aggregator::process() aggregator.cpp:58
Common Pitfalls & Debugging
Frequent Mistakes
| Pitfall | Symptom | Solution |
|---|---|---|
| Forgetting to lock | Garbled output, crashes, TSan errors | Always use lock_guard before shared access |
| Lock held during I/O | Poor performance, potential deadlock | Release lock before disk writes |
| Missing notify after push | Consumer hangs forever | Call cv.notify_one() after every push |
| No spurious wakeup handling | Consumer processes empty queue | Always use predicate with cv.wait() |
| Joining non-started thread | Crash (terminate called) | Check joinable() before join() |
| Forgetting to join threads | Program exits, threads killed | Always join or detach all threads |
| Lock order violation | Deadlock | Always acquire locks in consistent order |
| Using wrong lock type | Compilation error or deadlock | Use unique_lock with condition_variable |
Debugging Strategies
Symptom: Program hangs
- Attach with gdb:
gdb -p <pid> thread apply all bt- see where each thread is stuck- Look for threads waiting on mutex or condition variable
- Check if
notifyis being called when expected
Symptom: Data corruption
- Enable ThreadSanitizer
- Add logging to track entry flow
- Verify counts: entries_generated == entries_written
Symptom: Memory grows unbounded
- Check queue size periodically
- Verify consumer is actually consuming
- Add maximum queue size with backpressure
Symptom: Not all entries written
- Ensure shutdown waits for queue drain
- Check for early exit in consumer loop
- Verify final batch flush
Debugging Checklist
When something goes wrong:
- Is TSan enabled? Run with
-fsanitize=thread - Are all shared variables protected by mutex?
- Are all threads joined before main exits?
- Is condition_variable used with predicate?
- Is notify_one/all called after pushing?
- Is shutdown properly signaling all waiting threads?
- Are you releasing the lock before I/O operations?
Extensions & Challenges
Beginner Extensions
- Add log levels: Filter by DEBUG/INFO/WARN/ERROR
- Log rotation: Start new file after N entries or N bytes
- Timestamps: Add microsecond precision timestamps
- JSON output: Format entries as JSON for easier parsing
Intermediate Extensions
- Multiple consumers: Partition entries across N aggregator threads
- Priority queue: Process ERROR entries before DEBUG
- Disk I/O thread: Separate disk writes from aggregation
- Back-pressure notification: Producers slow down when queue is filling
Advanced Extensions
- Lock-free queue: Replace mutex queue with lock-free MPSC queue
- Memory-mapped output: Use mmap for faster disk writes
- Network logging: Send entries over TCP to log server
- Metrics endpoint: HTTP endpoint exposing real-time statistics
Challenge: Make It Production-Ready
Add these features to make it deployment-worthy:
- Configuration file: YAML/JSON config instead of command-line args
- Graceful restart: Reload config without losing entries
- Health checks: Is the system running correctly?
- Alerting: Notify if queue depth exceeds threshold
- Compression: Compress log files to save space
Real-World Connections
Industry Applications
This pattern is used in:
- syslog-ng / rsyslog: Enterprise log aggregation systems use exactly this pattern
- Apache Kafka: Producers push to partitions, consumers pull and process
- Elasticsearch: Bulk indexing uses producer-consumer queues
- Game engines: Input events -> Game thread -> Render thread
- Video processing: Decode thread -> Process thread -> Encode thread
How Production Systems Do It
Difference from this project:
| Aspect | This Project | Production System |
|---|---|---|
| Queue | In-memory, bounded | Often persistent (disk/network) |
| Consumers | Single thread | Worker pool with load balancing |
| Durability | Lost on crash | Write-ahead log, replication |
| Monitoring | Basic stats | Prometheus metrics, alerts |
| Config | Command-line | Configuration service |
What You’d Add for Production
- Persistence: Don’t lose data on crash
- Clustering: Multiple machines for scale and fault tolerance
- Partitioning: Distribute load across consumers
- Dead letter queue: Handle messages that fail processing
- Exactly-once semantics: Prevent duplicate processing
Resources
Essential Documentation
- std::thread - Thread creation and management
- std::mutex - Mutual exclusion
- std::condition_variable - Thread coordination
- std::atomic - Atomic operations
Video Resources
- CppCon talks by Anthony Williams (author of C++ Concurrency in Action)
- “Back to Basics: Concurrency” from recent CppCon sessions
Tools
- ThreadSanitizer:
-fsanitize=threadflag for GCC/Clang - Helgrind: Valgrind tool for thread error detection
- gdb: Attach to hung process, examine thread states
Related Projects
- Previous: None (this is the first project)
- Next: P02 (Promise-Based Task Coordinator) builds on future/promise concepts
Self-Assessment Checklist
Before considering this project complete, verify:
Understanding
- I can explain what a data race is and how to prevent it
- I can explain why condition variables are better than polling
- I can explain what spurious wakeups are and how to handle them
- I can draw the producer-consumer pattern and explain data flow
- I can explain when to use lock_guard vs unique_lock
Implementation
- Multiple producers push entries without corruption
- Consumer waits efficiently (no CPU-burning poll loop)
- Queue has maximum size with graceful backpressure handling
- Shutdown drains all remaining entries
- Statistics accurately track all entries
- No ThreadSanitizer warnings
Practical Skills
- I can compile with
-fsanitize=threadand interpret errors - I can use gdb to debug a hung threaded program
- I can measure throughput and identify bottlenecks
- I can explain my design decisions in an interview setting
Submission / Completion Criteria
Minimum Viable Completion
- 3 producers, 1 consumer working correctly
- Log entries written to files in correct format
- No data races (passes TSan)
- Basic shutdown works
Full Completion
- Configurable producers/entries/batch-size via command-line
- Condition variable for efficient waiting (no polling)
- Bounded queue with backpressure handling
- Graceful shutdown with queue draining
- Statistics report at end
- Documented code with README
Excellence (Going Above & Beyond)
- Signal handler for Ctrl+C
- Log rotation
- Multiple consumers
- Lock-free queue implementation
- Comprehensive test suite
- Performance benchmarks with analysis
This guide was expanded from LEARN_CPP_CONCURRENCY_AND_PARALLELISM.md. For the complete learning path, see the project index.