Project 21: Thread Pool Implementation
Build a reusable thread pool library with a bounded work queue, condition variables, and clean shutdown semantics. Master the producer-consumer pattern that powers modern concurrent systems.
Quick Reference
| Attribute | Value |
|---|---|
| Language | C (alt: Rust, Go, Java) |
| Difficulty | Advanced |
| Time | 1-2 weeks |
| Chapters | 12 |
| Coolness | ★★★★☆ Production-Ready |
| Portfolio Value | High (servers, systems) |
Learning Objectives
By completing this project, you will:
- Master the producer-consumer pattern: Implement a correct bounded buffer with proper blocking semantics on full/empty conditions
- Understand condition variables deeply: Learn when to use signal vs broadcast, why while loops are essential, and how to avoid spurious wakeups
- Build thread lifecycle management: Create, coordinate, and cleanly terminate multiple worker threads
- Implement backpressure handling: Design systems that gracefully handle overload without losing work
- Develop shutdown choreography skills: Coordinate clean shutdown across multiple threads without deadlocks or lost tasks
- Apply synchronization primitives correctly: Use mutexes and condition variables together following proven patterns
- Debug concurrency issues systematically: Use tools like helgrind, ThreadSanitizer, and careful logging to find race conditions
- Connect thread pools to real systems: Integrate your pool with network servers, seeing how production systems handle concurrent requests
- Measure and optimize concurrent performance: Understand throughput, latency, and resource utilization trade-offs
- Build intuition for thread count tuning: Learn why “more threads” isn’t always better and how to size pools appropriately
Deep Theoretical Foundation
Why Thread Pools Exist
Thread pools solve fundamental problems with naive concurrent designs:
┌────────────────────────────────────────────────────────────────────────────────┐
│ THE PROBLEM WITH THREAD-PER-REQUEST │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ NAIVE APPROACH: Create a new thread for each request │
│ ────────────────────────────────────────────────────── │
│ │
│ Request arrives ──► pthread_create() ──► Handle ──► pthread_exit() │
│ │
│ PROBLEMS: │
│ │
│ 1. Thread Creation Overhead │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ Operation Time (approx) Frequency │ │
│ ├──────────────────────────────────────────────────────────────────┤ │
│ │ pthread_create() 10-50 µs Per request │ │
│ │ Stack allocation 4-8 KB Per thread │ │
│ │ Kernel thread setup Variable Per request │ │
│ │ Thread destruction 5-20 µs Per request │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ With 10,000 requests/second: 100-500 ms wasted on thread overhead! │
│ │
│ 2. Unbounded Resource Consumption │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Requests Threads Memory (8KB stack) Impact │ │
│ │ ───────── ─────── ───────────────── ────── │ │
│ │ 100 100 800 KB Fine │ │
│ │ 1,000 1,000 8 MB Concerning │ │
│ │ 10,000 10,000 80 MB Problematic │ │
│ │ 100,000 100,000 800 MB System crash! │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ 3. Context Switch Overhead │
│ - OS scheduler thrashes with thousands of threads │
│ - Cache pollution from thread switching │
│ - Priority inversion problems │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ SOLUTION: THREAD POOL │
│ ──────────────────────── │
│ │
│ ┌─────────┐ ┌─────────────────────┐ ┌───────────────────────┐ │
│ │ Request │────►│ Bounded Queue │────►│ Fixed Worker Pool │ │
│ │ Flow │ │ (backpressure) │ │ (N threads) │ │
│ └─────────┘ └─────────────────────┘ └───────────────────────┘ │
│ │
│ Benefits: │
│ ✓ Thread creation cost amortized over many requests │
│ ✓ Bounded resource usage (N threads max) │
│ ✓ Natural backpressure when overloaded │
│ ✓ Better cache locality (same threads, warm caches) │
│ ✓ Predictable behavior under load │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
The Producer-Consumer Problem
The thread pool is built on the classic producer-consumer pattern:
┌────────────────────────────────────────────────────────────────────────────────┐
│ PRODUCER-CONSUMER PATTERN │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ PRODUCERS BUFFER CONSUMERS │
│ (submit work) (queue) (do work) │
│ │
│ ┌──────────┐ ┌──────────┐ │
│ │Producer 1│──┐ ┌──►│Consumer 1│ │
│ └──────────┘ │ │ └──────────┘ │
│ │ ┌─────────────────────────────┐ │ │
│ ┌──────────┐ │ │ BOUNDED BUFFER │ │ ┌──────────┐ │
│ │Producer 2│──┼──►│ ┌───┬───┬───┬───┬───┬───┐ │─┼──►│Consumer 2│ │
│ └──────────┘ │ │ │ T │ T │ T │ │ │ │ │ │ └──────────┘ │
│ │ │ └───┴───┴───┴───┴───┴───┘ │ │ │
│ ┌──────────┐ │ │ ▲ ▲ │ │ ┌──────────┐ │
│ │Producer 3│──┘ │ │ │ │ └──►│Consumer 3│ │
│ └──────────┘ │ rear front │ └──────────┘ │
│ └─────────────────────────────┘ │
│ │
│ SYNCHRONIZATION REQUIREMENTS: │
│ ─────────────────────────────── │
│ │
│ 1. MUTUAL EXCLUSION: Only one thread modifies buffer at a time │
│ - Protects: rear, front, count, buffer contents │
│ - Mechanism: Mutex lock │
│ │
│ 2. PRODUCER BLOCKING: When buffer is FULL │
│ - Producer must wait for space │
│ - Mechanism: Condition variable (not_full) │
│ │
│ 3. CONSUMER BLOCKING: When buffer is EMPTY │
│ - Consumer must wait for work │
│ - Mechanism: Condition variable (not_empty) │
│ │
│ 4. SIGNALING: Notify waiters when state changes │
│ - Producer signals: buffer became non-empty │
│ - Consumer signals: buffer became non-full │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Condition Variables: The Heart of the Pattern
Condition variables enable threads to efficiently wait for conditions to become true:
┌────────────────────────────────────────────────────────────────────────────────┐
│ CONDITION VARIABLE MECHANICS │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ WHY NOT JUST USE A MUTEX? │
│ ───────────────────────────── │
│ │
│ Busy-waiting (BAD): │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ while (1) { │ │
│ │ pthread_mutex_lock(&lock); │ │
│ │ if (queue_not_empty) { │ │
│ │ // take item │ │
│ │ pthread_mutex_unlock(&lock); │ │
│ │ break; │ │
│ │ } │ │
│ │ pthread_mutex_unlock(&lock); │ │
│ │ // Spin! Wastes CPU cycles! 🔥 │ │
│ │ } │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ With condition variable (GOOD): │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ pthread_mutex_lock(&lock); │ │
│ │ while (!queue_not_empty) { │ │
│ │ // Atomically: release lock + sleep │ │
│ │ pthread_cond_wait(&cond, &lock); │ │
│ │ // On wake: re-acquire lock │ │
│ │ } │ │
│ │ // take item │ │
│ │ pthread_mutex_unlock(&lock); │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ pthread_cond_wait() ATOMIC OPERATION: │
│ ─────────────────────────────────────── │
│ │
│ SINGLE ATOMIC STEP │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ 1. Release the mutex │ │
│ │ 2. Add thread to condition variable's wait queue │ │
│ │ 3. Block (sleep) until signaled │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ ON WAKEUP (also atomic) │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ 4. Remove from wait queue │ │
│ │ 5. Re-acquire the mutex (may block if contended) │ │
│ │ 6. Return from pthread_cond_wait() │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ WHY ATOMIC? Without atomicity: │
│ ───────────────────────────────── │
│ Thread A: unlock() ─┐ │
│ │ ◄── Gap! Signal could arrive here and be lost! │
│ Thread A: sleep() ─┘ │
│ │
│ Thread B: signal() ──── No one waiting yet = signal lost forever! │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Signal vs Broadcast: When to Use Each
┌────────────────────────────────────────────────────────────────────────────────┐
│ SIGNAL vs BROADCAST │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ pthread_cond_signal(&cond) │
│ ────────────────────────────── │
│ - Wakes ONE waiting thread (unspecified which one) │
│ - More efficient when only one thread can proceed │
│ - Use for: "exactly one resource became available" │
│ │
│ Example: Producer adds ONE item to queue │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ enqueue(item); │ │
│ │ pthread_cond_signal(¬_empty); // Wake one consumer │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
│ pthread_cond_broadcast(&cond) │
│ ────────────────────────────────── │
│ - Wakes ALL waiting threads │
│ - Less efficient (thundering herd) │
│ - Use for: "state changed, everyone should re-check" │
│ │
│ Example: Shutdown signal │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ pool->shutdown = 1; │ │
│ │ pthread_cond_broadcast(¬_empty); // Wake ALL workers │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ THUNDERING HERD PROBLEM: │
│ ────────────────────────── │
│ │
│ 10 workers waiting ──► broadcast() ──► All 10 wake up │
│ Only 1 gets the item │
│ 9 go back to sleep │
│ │
│ Wasted: 9 context switches, 9 mutex acquisitions │
│ │
│ SOLUTION: Use signal() for normal operations, broadcast() only for shutdown │
│ │
│ DECISION MATRIX: │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Scenario Use │ │
│ ├──────────────────────────────────────────────────────────────────────┤ │
│ │ One item added to queue signal() - one consumer needed │ │
│ │ One slot freed in buffer signal() - one producer needed │ │
│ │ Shutdown requested broadcast() - all must wake │ │
│ │ Configuration changed broadcast() - all must re-check │ │
│ │ Multiple items added at once Could signal() N times or │ │
│ │ broadcast() once │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Why While Loops Are Essential: The Spurious Wakeup Problem
┌────────────────────────────────────────────────────────────────────────────────┐
│ THE SPURIOUS WAKEUP PROBLEM │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ WRONG CODE (using if): │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ pthread_mutex_lock(&lock); │ │
│ │ if (queue_size == 0) { // IF - checks once! │ │
│ │ pthread_cond_wait(¬_empty, &lock); │ │
│ │ } │ │
│ │ item = dequeue(); // CRASH! Queue might be empty│ │
│ │ pthread_mutex_unlock(&lock); │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ CORRECT CODE (using while): │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ pthread_mutex_lock(&lock); │ │
│ │ while (queue_size == 0) { // WHILE - re-checks! │ │
│ │ pthread_cond_wait(¬_empty, &lock); │ │
│ │ } │ │
│ │ item = dequeue(); // Safe - we verified │ │
│ │ pthread_mutex_unlock(&lock); │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ WHY SPURIOUS WAKEUPS HAPPEN: │
│ ───────────────────────────── │
│ │
│ 1. Implementation artifacts: │
│ - Some OSes wake threads on signals (EINTR) │
│ - pthread_cond_wait may return without a signal │
│ - POSIX explicitly allows this for efficiency │
│ │
│ 2. Multiple waiters scenario: │
│ │
│ Timeline: │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ T0: Worker A waiting on empty queue │ │
│ │ T1: Worker B waiting on empty queue │ │
│ │ T2: Producer adds ONE item, signals │ │
│ │ T3: Worker A wakes, acquires lock │ │
│ │ T4: Worker B ALSO wakes (broadcast or implementation detail) │ │
│ │ T5: Worker A dequeues the item │ │
│ │ T6: Worker A unlocks │ │
│ │ T7: Worker B acquires lock... QUEUE IS EMPTY! │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
│ With 'if': Worker B crashes (tries to dequeue from empty) │
│ With 'while': Worker B re-checks, sees empty, waits again ✓ │
│ │
│ 3. Signal vs actual state: │
│ │
│ - Signal means "state MIGHT have changed" │
│ - NOT "state HAS changed in a way you can use" │
│ - Always verify the condition after waking! │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Thread Lifecycle and Synchronization
┌────────────────────────────────────────────────────────────────────────────────┐
│ THREAD POOL LIFECYCLE │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ INITIALIZATION │
│ ────────────────── │
│ │
│ Main Thread │
│ │ │
│ ├── Initialize mutex and condition variables │
│ │ │
│ ├── Create bounded queue (allocate, set capacity) │
│ │ │
│ ├── pthread_create(worker_0) │
│ │ └──► Worker 0: waiting on not_empty │
│ │ │
│ ├── pthread_create(worker_1) │
│ │ └──► Worker 1: waiting on not_empty │
│ │ │
│ └── pthread_create(worker_N) │
│ └──► Worker N: waiting on not_empty │
│ │
│ STEADY STATE OPERATION │
│ ─────────────────────── │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Producer(s) Queue Workers │ │
│ │ ────────── ───── ─────── │ │
│ │ │ │
│ │ submit(task1) ──────► [T1│T2│T3│ │ ] ──────► Worker 0 executes │ │
│ │ submit(task2) ──────► ──────► Worker 1 executes │ │
│ │ submit(task3) ──────► ──────► Worker 2 waiting │ │
│ │ │ │
│ │ [if queue full: producer blocks on not_full] │ │
│ │ [if queue empty: workers block on not_empty] │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ SHUTDOWN SEQUENCE │
│ ────────────────── │
│ │
│ Option 1: GRACEFUL (complete all queued work) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 1. Set shutdown flag = GRACEFUL │ │
│ │ 2. Stop accepting new submissions (return error) │ │
│ │ 3. Broadcast not_empty (wake all workers) │ │
│ │ 4. Workers: check shutdown AFTER queue empty │ │
│ │ 5. Workers: process remaining tasks, then exit │ │
│ │ 6. Main: join all worker threads │ │
│ │ 7. Main: destroy mutex/condvars, free queue │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ Option 2: IMMEDIATE (discard queued work) │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 1. Set shutdown flag = IMMEDIATE │ │
│ │ 2. Broadcast not_empty AND not_full │ │
│ │ 3. Workers: check shutdown BEFORE dequeue attempt │ │
│ │ 4. Workers: exit immediately (current task finishes) │ │
│ │ 5. Main: join all worker threads │ │
│ │ 6. Main: drain queue (free remaining tasks) │ │
│ │ 7. Main: destroy mutex/condvars, free queue │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Bounded Buffer Implementation Strategies
┌────────────────────────────────────────────────────────────────────────────────┐
│ BOUNDED BUFFER IMPLEMENTATIONS │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ CIRCULAR ARRAY (Recommended for fixed-size queues) │
│ ────────────────────────────────────────────────── │
│ │
│ ┌───┬───┬───┬───┬───┬───┬───┬───┐ │
│ │ 5 │ 6 │ 7 │ │ │ │ 3 │ 4 │ capacity = 8 │
│ └───┴───┴───┴───┴───┴───┴───┴───┘ │
│ ▲ ▲ │
│ rear front │
│ │
│ count = 5, front = 6, rear = 3 │
│ │
│ Enqueue (at rear): │
│ buffer[rear] = item; │
│ rear = (rear + 1) % capacity; │
│ count++; │
│ │
│ Dequeue (from front): │
│ item = buffer[front]; │
│ front = (front + 1) % capacity; │
│ count--; │
│ │
│ Empty: count == 0 │
│ Full: count == capacity │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ LINKED LIST (For variable-size or unbounded queues) │
│ ────────────────────────────────────────────────── │
│ │
│ head ──► [task1|next] ──► [task2|next] ──► [task3|NULL] ◄── tail │
│ │
│ Enqueue (at tail): │
│ new_node->next = NULL; │
│ if (tail) tail->next = new_node; │
│ tail = new_node; │
│ if (!head) head = new_node; │
│ count++; │
│ │
│ Dequeue (from head): │
│ node = head; │
│ head = head->next; │
│ if (!head) tail = NULL; │
│ count--; │
│ return node; │
│ │
│ Pros: No fixed capacity, dynamic growth │
│ Cons: Malloc overhead per item, cache unfriendly │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ COMPARISON: │
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ Aspect Circular Array Linked List │ │
│ ├────────────────────────────────────────────────────────────────────────┤ │
│ │ Memory Fixed, preallocated Dynamic per item │ │
│ │ Cache friendliness Excellent Poor │ │
│ │ Allocation cost None at runtime malloc() per enqueue │ │
│ │ Bounded Yes (natural) Must track count │ │
│ │ Implementation Simpler More complex │ │
│ │ Best for Thread pools Unbounded queues │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Semaphores vs Condition Variables
┌────────────────────────────────────────────────────────────────────────────────┐
│ SEMAPHORES vs CONDITION VARIABLES │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ SEMAPHORE │
│ ─────────── │
│ - Integer counter (≥ 0) │
│ - Two operations: wait (P/down) and signal (V/up) │
│ - wait: if count > 0, decrement; else block │
│ - signal: increment count, wake one waiter │
│ - State is "remembered" (signal without waiter still increments) │
│ │
│ CONDITION VARIABLE │
│ ────────────────────── │
│ - No state of its own │
│ - Always used with a mutex + shared predicate │
│ - wait: atomically unlock + sleep; relock on wake │
│ - signal: wake one (or all) waiters │
│ - Signal without waiter is a no-op (lost) │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ PRODUCER-CONSUMER WITH SEMAPHORES: │
│ ────────────────────────────────────── │
│ │
│ sem_t mutex; // Binary semaphore for mutual exclusion │
│ sem_t slots; // Counting: empty slots (init = capacity) │
│ sem_t items; // Counting: filled slots (init = 0) │
│ │
│ Producer: Consumer: │
│ ────────── ────────── │
│ sem_wait(&slots); // Wait for sem_wait(&items); // Wait for │
│ // empty slot // item │
│ sem_wait(&mutex); // Lock sem_wait(&mutex); // Lock │
│ enqueue(item); item = dequeue(); │
│ sem_post(&mutex); // Unlock sem_post(&mutex); // Unlock │
│ sem_post(&items); // Signal sem_post(&slots); // Signal │
│ // item ready // slot freed │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ PRODUCER-CONSUMER WITH CONDITION VARIABLES: │
│ ─────────────────────────────────────────────── │
│ │
│ pthread_mutex_t lock; │
│ pthread_cond_t not_empty; // Signaled when count > 0 │
│ pthread_cond_t not_full; // Signaled when count < capacity │
│ int count; │
│ │
│ Producer: Consumer: │
│ ────────── ────────── │
│ pthread_mutex_lock(&lock); pthread_mutex_lock(&lock); │
│ while (count == capacity) while (count == 0) │
│ pthread_cond_wait( pthread_cond_wait( │
│ ¬_full, &lock); ¬_empty, &lock); │
│ enqueue(item); item = dequeue(); │
│ pthread_cond_signal(¬_empty); pthread_cond_signal(¬_full); │
│ pthread_mutex_unlock(&lock); pthread_mutex_unlock(&lock); │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ WHICH TO CHOOSE: │
│ ───────────────── │
│ │
│ Use SEMAPHORES when: │
│ - Counting resources (connection pool, worker count) │
│ - Cross-process synchronization (named semaphores) │
│ - State must be "remembered" │
│ │
│ Use CONDITION VARIABLES when: │
│ - Complex predicates (more than just counting) │
│ - Need to hold lock across multiple operations │
│ - More flexible wakeup patterns (broadcast) │
│ - Already using mutexes for other protection │
│ │
│ For thread pools: Either works; condvars more common for flexibility │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Work Stealing (Advanced Concept)
┌────────────────────────────────────────────────────────────────────────────────┐
│ WORK STEALING ARCHITECTURE │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ STANDARD THREAD POOL (Single shared queue): │
│ ─────────────────────────────────────────── │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SHARED QUEUE │ │
│ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │
│ │ │Task│Task│Task│Task│Task│Task│Task│Task│ ◄── Contention! │ │
│ │ └────┴────┴────┴────┴────┴────┴────┴────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ ▲ ▲ ▲ ▲ │
│ │ │ │ │ │
│ Worker 0 Worker 1 Worker 2 Worker 3 │
│ │
│ Problem: All workers contend for the same lock │
│ At high worker counts, lock becomes bottleneck │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ WORK STEALING (Per-worker queues): │
│ ────────────────────────────────── │
│ │
│ Worker 0 Queue Worker 1 Queue Worker 2 Queue Worker 3 Queue │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │Task│Task│ │ │Task│ │ │ │ │Task│Task│ │ │
│ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │
│ ▲ ▲ ▲ ▲ │
│ │ │ │ │ │
│ Worker 0 Worker 1 Worker 2 Worker 3 │
│ (working) (working) (idle-steals!) (working) │
│ │ │
│ └──► Steal from Worker 0 tail │
│ │
│ DEQUE (Double-Ended Queue) per worker: │
│ ───────────────────────────────────── │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ HEAD ◄── Owner pushes/pops here (LIFO for locality) │ │
│ │ ┌────┬────┬────┬────┬────┬────┐ │ │
│ │ │ T1 │ T2 │ T3 │ T4 │ T5 │ T6 │ │ │
│ │ └────┴────┴────┴────┴────┴────┘ │ │
│ │ TAIL ◄── Thieves steal from here (FIFO - oldest tasks) │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ Benefits: │
│ ───────── │
│ - No lock contention on normal path (owner has exclusive access) │
│ - Stealing is rare, only when idle │
│ - Better cache locality (owner reuses recently pushed tasks) │
│ - Automatic load balancing │
│ │
│ Implementation complexity: HIGH (need lock-free stealing) │
│ When to use: Very high throughput systems (Go scheduler, Tokio, Rayon) │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Choosing the Right Number of Threads
┌────────────────────────────────────────────────────────────────────────────────┐
│ THREAD POOL SIZING │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ RULE OF THUMB: │
│ ─────────────── │
│ │
│ CPU-bound work: threads = num_cpus │
│ I/O-bound work: threads = num_cpus * (1 + wait_time/compute_time) │
│ Mixed work: threads = num_cpus * 2 (then measure and adjust) │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ CPU-BOUND WORK (e.g., image processing, crypto): │
│ ───────────────────────────────────────────────── │
│ │
│ Threads CPU Usage Throughput Context Switches │
│ ──────── ───────── ────────── ──────────────── │
│ 4 (=cpus) 100% Optimal Minimal │
│ 8 100% Same Increased (wasted) │
│ 16 100% Worse! High (thrashing) │
│ │
│ Why more threads hurts: Context switch overhead, cache pollution │
│ │
│ I/O-BOUND WORK (e.g., web requests, database queries): │
│ ─────────────────────────────────────────────────────── │
│ │
│ Example: Tasks spend 90ms waiting on I/O, 10ms computing │
│ Wait ratio = 90/10 = 9 │
│ Optimal threads ≈ 4 * (1 + 9) = 40 threads │
│ │
│ Threads CPU Usage Throughput Note │
│ ──────── ───────── ────────── ──── │
│ 4 10% Low Waiting on I/O │
│ 20 50% 5x better Still waiting sometimes │
│ 40 ~100% ~10x better Optimal │
│ 100 ~100% Same Diminishing returns │
│ 1000 ~100% Worse Thread overhead dominates │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ LITTLE'S LAW: │
│ ───────────── │
│ │
│ L = λ * W │
│ │
│ L = average number of items in system (threads needed) │
│ λ = arrival rate (requests per second) │
│ W = average time per request │
│ │
│ Example: │
│ - 1000 requests/second │
│ - Each request takes 50ms average │
│ - L = 1000 * 0.05 = 50 threads minimum │
│ │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ PRACTICAL ADVICE: │
│ ───────────────── │
│ │
│ 1. Start with 2 * num_cpus │
│ 2. Measure under realistic load │
│ 3. Watch for: │
│ - CPU saturation (good if I/O-bound work) │
│ - Queue growth (need more threads or slower rate) │
│ - Latency percentiles (P99 matters!) │
│ 4. Adjust and re-measure │
│ 5. Consider making it configurable at runtime │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Project Specification
What You Will Build
A reusable thread pool library with the following components:
- Bounded work queue: Thread-safe producer-consumer buffer with blocking operations
- Worker pool: Fixed number of threads that pull and execute tasks
- Task abstraction: Function pointer + argument pairs
- Lifecycle management: Create, submit, wait, shutdown, destroy
- Statistics: Track throughput, queue depth, latency (optional but recommended)
API Surface
// Create a new thread pool
// Returns NULL on error
threadpool_t *threadpool_create(int num_threads, int queue_capacity);
// Submit a task to the pool
// Blocks if queue is full (backpressure)
// Returns 0 on success, -1 on error (pool shutting down)
int threadpool_submit(threadpool_t *pool, void (*function)(void *), void *arg);
// Wait for all queued tasks to complete and workers to become idle
// Does NOT shut down the pool - can still submit more work after
void threadpool_wait(threadpool_t *pool);
// Graceful shutdown: complete all queued tasks, then terminate workers
// Immediate shutdown: terminate workers, discard remaining tasks
void threadpool_shutdown(threadpool_t *pool, int graceful);
// Clean up all resources (must call shutdown first)
void threadpool_destroy(threadpool_t *pool);
// Optional: Get statistics
typedef struct {
uint64_t tasks_submitted;
uint64_t tasks_completed;
int queue_depth;
int active_workers;
double avg_wait_time_ms;
double avg_exec_time_ms;
} threadpool_stats_t;
int threadpool_get_stats(threadpool_t *pool, threadpool_stats_t *stats);
Example Usage
#include "threadpool.h"
#include <stdio.h>
void print_number(void *arg) {
int *num = (int *)arg;
printf("Processing: %d\n", *num);
// Simulate work
usleep(10000); // 10ms
}
int main() {
// Create pool with 4 workers and queue capacity of 64
threadpool_t *pool = threadpool_create(4, 64);
if (!pool) {
fprintf(stderr, "Failed to create pool\n");
return 1;
}
// Submit 100 tasks
int numbers[100];
for (int i = 0; i < 100; i++) {
numbers[i] = i;
if (threadpool_submit(pool, print_number, &numbers[i]) != 0) {
fprintf(stderr, "Failed to submit task %d\n", i);
}
}
// Wait for all tasks to complete
threadpool_wait(pool);
// Clean shutdown
threadpool_shutdown(pool, 1); // graceful = 1
threadpool_destroy(pool);
return 0;
}
Functional Requirements
- Thread-safe: All operations must be safe for concurrent access
- Correct blocking: Producers block when queue full, consumers block when empty
- No lost tasks: Every submitted task must be executed (in graceful shutdown)
- Clean shutdown: No deadlocks, no resource leaks, all threads terminate
- Backpressure: When overwhelmed, slow down producers (don’t drop tasks)
Non-Functional Requirements
- Efficiency: Minimize lock contention, avoid busy-waiting
- Robustness: Handle edge cases (NULL pointers, double-destroy, etc.)
- Debuggability: Support logging/assertions in debug mode
- Portability: Work on Linux and macOS
Real World Outcome
When you complete this project, here’s exactly what you’ll see:
Basic Demonstration
$ ./threadpool --workers=4 --demo
================================================================================
THREAD POOL DEMONSTRATION
================================================================================
[INIT] Creating pool with 4 worker threads
[INIT] Work queue capacity: 64 tasks
[WORKER-0] Started, waiting for work...
[WORKER-1] Started, waiting for work...
[WORKER-2] Started, waiting for work...
[WORKER-3] Started, waiting for work...
[SUBMIT] Task 1: compute_fibonacci(40)
[SUBMIT] Task 2: compute_fibonacci(35)
[SUBMIT] Task 3: compress_file("data.bin")
[SUBMIT] Task 4: compute_factorial(20)
[SUBMIT] Task 5: hash_password("secret")
[WORKER-0] Executing task 1: compute_fibonacci(40)
[WORKER-1] Executing task 2: compute_fibonacci(35)
[WORKER-2] Executing task 3: compress_file("data.bin")
[WORKER-3] Executing task 4: compute_factorial(20)
[WORKER-1] Completed task 2 in 89ms (result: 9227465)
[WORKER-1] Executing task 5: hash_password("secret")
[WORKER-3] Completed task 4 in 12ms (result: 2432902008176640000)
[WORKER-1] Completed task 5 in 156ms
[WORKER-2] Completed task 3 in 423ms (compressed 1.2MB -> 340KB)
[WORKER-0] Completed task 1 in 1247ms (result: 102334155)
================================================================================
POOL STATISTICS
================================================================================
Total tasks submitted: 5
Total tasks completed: 5
Average wait time: 34ms
Average execution time: 385ms
Queue high watermark: 4/64
Stress Test Mode
$ ./threadpool --workers=2 --stress-test --tasks=10000
================================================================================
STRESS TEST MODE
================================================================================
[CONFIG] Workers: 2, Tasks: 10000, Queue size: 256
[PROGRESS] 1000/10000 tasks (10.0%) - 847 tasks/sec
[PROGRESS] 2000/10000 tasks (20.0%) - 892 tasks/sec
[PROGRESS] 5000/10000 tasks (50.0%) - 921 tasks/sec
[PROGRESS] 10000/10000 tasks (100.0%) - 934 tasks/sec
[RESULT] All 10000 tasks completed successfully
[RESULT] Total time: 10.71s
[RESULT] Throughput: 934 tasks/sec
[RESULT] No deadlocks detected
[RESULT] No tasks lost during shutdown
Graceful Shutdown Test
$ ./threadpool --workers=4 --graceful-shutdown-test
================================================================================
GRACEFUL SHUTDOWN TEST
================================================================================
[TEST] Submitting 100 long-running tasks...
[TEST] Requesting shutdown while 87 tasks pending...
[POOL] Shutdown requested - completing in-flight tasks
[POOL] Worker-0 finishing current task, then exiting
[POOL] Worker-1 finishing current task, then exiting
[POOL] Worker-2 completing request, then exit
[POOL] Worker-3 completing request, then exit
[POOL] Draining remaining 83 queued tasks...
[POOL] All workers joined
[RESULT] PASS - All 100 tasks completed
[RESULT] PASS - No memory leaks (valgrind clean)
[RESULT] PASS - Shutdown completed in 2.3s
Integration with HTTP Server
$ ./http-server --port 8080 --pool-size 8 --queue-size 128
HTTP Server starting...
Thread pool: 8 workers, queue capacity 128
Listening on port 8080...
# In another terminal:
$ ab -n 10000 -c 100 http://localhost:8080/
Completed 10000 requests
Time taken: 2.34 seconds
Requests per second: 4273.50 [#/sec]
Concurrency Level: 100
Failed requests: 0
Queue depth distribution:
0-10: 45% of time
11-50: 40% of time
51-100: 14% of time
101-128: 1% of time (backpressure applied)
Solution Architecture
High-Level Design
┌────────────────────────────────────────────────────────────────────────────────┐
│ THREAD POOL ARCHITECTURE │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ PUBLIC API │ │
│ │ │ │
│ │ threadpool_create() threadpool_submit() threadpool_shutdown() │ │
│ │ threadpool_wait() threadpool_destroy() threadpool_get_stats() │ │
│ └───────────────────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ THREAD POOL CORE │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ SYNCHRONIZATION │ │ │
│ │ │ │ │ │
│ │ │ pthread_mutex_t pool_lock; // Protects all pool state │ │ │
│ │ │ pthread_cond_t not_empty; // Queue has items │ │ │
│ │ │ pthread_cond_t not_full; // Queue has space │ │ │
│ │ │ pthread_cond_t all_idle; // All workers idle (for wait) │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ WORK QUEUE (Bounded Buffer) │ │ │
│ │ │ │ │ │
│ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ CIRCULAR ARRAY OF TASK POINTERS │ │ │ │
│ │ │ │ ┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐ │ │ │ │
│ │ │ │ │Task* │Task* │Task* │ NULL │ NULL │ NULL │Task* │Task* │ │ │ │ │
│ │ │ │ └──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┘ │ │ │ │
│ │ │ │ ▲ ▲ │ │ │ │
│ │ │ │ rear front │ │ │ │
│ │ │ └────────────────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ │ int capacity; // Max tasks in queue │ │ │
│ │ │ int count; // Current tasks in queue │ │ │
│ │ │ int front, rear; // Circular buffer indices │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ WORKER THREADS │ │ │
│ │ │ │ │ │
│ │ │ pthread_t workers[N]; // Thread handles │ │ │
│ │ │ int num_workers; // Pool size │ │ │
│ │ │ int active_count; // Currently executing task │ │ │
│ │ │ │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ Worker 0 │ │ Worker 1 │ │ Worker 2 │ │ Worker N │ │ │ │
│ │ │ │ loop: │ │ loop: │ │ loop: │ │ loop: │ │ │ │
│ │ │ │ dequeue │ │ dequeue │ │ dequeue │ │ dequeue │ │ │ │
│ │ │ │ execute │ │ execute │ │ execute │ │ execute │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ POOL STATE │ │ │
│ │ │ │ │ │
│ │ │ int shutdown; // 0=running, 1=graceful, 2=immediate │ │ │
│ │ │ int tasks_in_flight; // Submitted but not completed │ │ │
│ │ │ │ │ │
│ │ │ // Statistics (optional) │ │ │
│ │ │ uint64_t total_submitted; │ │ │
│ │ │ uint64_t total_completed; │ │ │
│ │ │ uint64_t total_wait_ns; │ │ │
│ │ │ uint64_t total_exec_ns; │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Data Structures
/* Task structure - represents a unit of work */
typedef struct task {
void (*function)(void *arg); // Function to execute
void *arg; // Argument to pass
// Optional for linked list implementation:
struct task *next;
} task_t;
/* Thread pool structure */
typedef struct threadpool {
/* Synchronization */
pthread_mutex_t lock; // Protects all fields
pthread_cond_t not_empty; // Signaled when queue becomes non-empty
pthread_cond_t not_full; // Signaled when queue has space
pthread_cond_t all_idle; // Signaled when active_count reaches 0
/* Work queue (circular buffer) */
task_t **queue; // Array of task pointers
int capacity; // Maximum queue size
int count; // Current items in queue
int front; // Index for next dequeue
int rear; // Index for next enqueue
/* Workers */
pthread_t *workers; // Array of worker thread handles
int num_workers; // Number of worker threads
int active_count; // Workers currently executing a task
/* State */
int shutdown; // 0=running, 1=graceful, 2=immediate
int tasks_in_flight; // Submitted but not yet completed
/* Statistics (optional) */
uint64_t total_submitted;
uint64_t total_completed;
// Add more as needed
} threadpool_t;
Worker Thread Algorithm
┌────────────────────────────────────────────────────────────────────────────────┐
│ WORKER THREAD PSEUDOCODE │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ worker_thread(pool): │
│ loop forever: │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ LOCK(pool->lock) │ │
│ │ │ │
│ │ // Wait for work or shutdown │ │
│ │ while (pool->count == 0 AND NOT pool->shutdown): │ │
│ │ WAIT(pool->not_empty, pool->lock) │ │
│ │ │ │
│ │ // Check if we should exit │ │
│ │ if (pool->shutdown == IMMEDIATE): │ │
│ │ UNLOCK(pool->lock) │ │
│ │ return │ │
│ │ │ │
│ │ if (pool->shutdown == GRACEFUL AND pool->count == 0): │ │
│ │ UNLOCK(pool->lock) │ │
│ │ return │ │
│ │ │ │
│ │ // Dequeue a task │ │
│ │ task = pool->queue[pool->front] │ │
│ │ pool->front = (pool->front + 1) % pool->capacity │ │
│ │ pool->count-- │ │
│ │ pool->active_count++ │ │
│ │ │ │
│ │ // Signal that there's now space in the queue │ │
│ │ SIGNAL(pool->not_full) │ │
│ │ │ │
│ │ UNLOCK(pool->lock) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ // Execute the task (NO LOCK HELD!) │ │
│ │ task->function(task->arg) │ │
│ │ free(task) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ LOCK(pool->lock) │ │
│ │ │ │
│ │ pool->active_count-- │ │
│ │ pool->tasks_in_flight-- │ │
│ │ pool->total_completed++ │ │
│ │ │ │
│ │ // Signal if all workers are now idle │ │
│ │ if (pool->active_count == 0 AND pool->count == 0): │ │
│ │ SIGNAL(pool->all_idle) │ │
│ │ │ │
│ │ UNLOCK(pool->lock) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Submit Algorithm
┌────────────────────────────────────────────────────────────────────────────────┐
│ SUBMIT TASK PSEUDOCODE │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ threadpool_submit(pool, function, arg): │
│ // Allocate task │
│ task = malloc(sizeof(task_t)) │
│ task->function = function │
│ task->arg = arg │
│ │
│ LOCK(pool->lock) │
│ │
│ // Wait if queue is full (backpressure) │
│ while (pool->count == pool->capacity AND NOT pool->shutdown): │
│ WAIT(pool->not_full, pool->lock) │
│ │
│ // Check if pool is shutting down │
│ if (pool->shutdown): │
│ UNLOCK(pool->lock) │
│ free(task) │
│ return ERROR_SHUTDOWN │
│ │
│ // Enqueue the task │
│ pool->queue[pool->rear] = task │
│ pool->rear = (pool->rear + 1) % pool->capacity │
│ pool->count++ │
│ pool->tasks_in_flight++ │
│ pool->total_submitted++ │
│ │
│ // Signal ONE waiting worker │
│ SIGNAL(pool->not_empty) │
│ │
│ UNLOCK(pool->lock) │
│ return SUCCESS │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Shutdown Algorithm
┌────────────────────────────────────────────────────────────────────────────────┐
│ SHUTDOWN PSEUDOCODE │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ threadpool_shutdown(pool, graceful): │
│ LOCK(pool->lock) │
│ │
│ // Set shutdown flag │
│ pool->shutdown = graceful ? GRACEFUL : IMMEDIATE │
│ │
│ // Wake ALL waiting workers │
│ BROADCAST(pool->not_empty) │
│ │
│ // Wake any blocked submitters │
│ BROADCAST(pool->not_full) │
│ │
│ UNLOCK(pool->lock) │
│ │
│ // Wait for all workers to terminate │
│ for i in 0..pool->num_workers: │
│ pthread_join(pool->workers[i], NULL) │
│ │
│ // If immediate shutdown, drain remaining queue │
│ if NOT graceful: │
│ while pool->count > 0: │
│ task = dequeue(pool) │
│ // Optionally call cancellation callback │
│ free(task) │
│ │
│ threadpool_wait(pool): │
│ LOCK(pool->lock) │
│ │
│ // Wait until queue is empty AND no active workers │
│ while (pool->count > 0 OR pool->active_count > 0): │
│ WAIT(pool->all_idle, pool->lock) │
│ │
│ UNLOCK(pool->lock) │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Implementation Guide
The Core Question You’re Answering
“How do you safely coordinate multiple threads that share a work queue, ensuring no race conditions, no lost work, and clean shutdown?”
This project forces you to confront the fundamental challenges of concurrent programming:
- What happens when two threads try to enqueue simultaneously?
- How do workers efficiently wait for work without burning CPU?
- When shutdown is requested, how do you ensure all queued work completes?
- How do you avoid the subtle bugs that cause deadlocks or lost wakeups?
Concepts You Must Understand First
Before starting implementation, verify your understanding:
| Concept | Why It Matters | Where to Learn |
|---|---|---|
| Mutex fundamentals | You’ll use locks to protect shared state | CS:APP 12.5.4 |
| Condition variables | The “wait and signal” pattern for efficient blocking | CS:APP 12.5.5 |
| Producer-consumer pattern | The bounded buffer is the core of your design | OSTEP Ch. 30 |
| Thread lifecycle | Creation, termination, joining, detaching | TLPI Ch. 29 |
| Memory visibility | Why shared data needs synchronization | CS:APP 12.5.1 |
| Deadlock prevention | Four conditions and how to avoid them | OSTEP Ch. 32 |
Self-Assessment Questions (answer these before coding):
- What does
pthread_cond_wait()do atomically? - Why must you hold the mutex when calling
pthread_cond_wait()? - What’s the difference between
pthread_cond_signal()andpthread_cond_broadcast()? - Why must you use a
whileloop aroundpthread_cond_wait(), not anif? - What happens if you signal a condition variable with no waiters?
- How do you ensure a thread exits cleanly without leaking resources?
Questions to Guide Your Design
Work through these BEFORE writing code:
-
Queue implementation: Will you use a circular array or linked list? What are the trade-offs?
-
Blocking behavior: When a producer tries to submit to a full queue, should it block indefinitely, return an error, or have a timeout?
-
Shutdown semantics: Should shutdown complete queued work (graceful) or discard it (immediate)? How do you signal workers to exit?
-
Worker termination: Should workers check the shutdown flag before or after dequeuing? What’s the difference?
-
Signal vs broadcast: When should you use
pthread_cond_signal()vspthread_cond_broadcast()? -
Thundering herd: If you use broadcast, how do you handle multiple workers waking for one item?
-
Task memory: Who allocates task structures? Who frees them? When?
-
Worker crash: If a worker thread crashes, should the pool detect this and spawn a replacement?
Thinking Exercise
Before writing code, trace through this scenario by hand:
// Initial state: queue is EMPTY, pool has 2 workers (both waiting on not_empty)
// Thread A (producer):
pthread_mutex_lock(&pool->lock);
// A acquires lock
enqueue(&pool->queue, task1);
// A signals condition variable
pthread_cond_signal(&pool->not_empty);
pthread_mutex_unlock(&pool->lock);
// Meanwhile, Thread B (producer) tries to submit:
pthread_mutex_lock(&pool->lock); // <-- What happens here?
// Worker-0 (waiting on cond var):
// wakes up from pthread_cond_wait()
// <-- What must Worker-0 do before accessing the queue?
// Worker-1 (also waiting):
// <-- Should Worker-1 wake up? What does it do?
Draw a timeline showing which thread holds the mutex at each moment. What if you used pthread_cond_broadcast() instead of pthread_cond_signal()?
Development Environment Setup
# Required packages (Ubuntu/Debian)
sudo apt-get install build-essential gdb valgrind
# For ThreadSanitizer (requires clang or recent gcc)
# Compile with: gcc -fsanitize=thread -g...
# Create project structure
mkdir -p threadpool/{src,include,tests}
cd threadpool
Project Structure
threadpool/
├── include/
│ └── threadpool.h # Public API
├── src/
│ ├── threadpool.c # Core implementation
│ └── main.c # Demo/test driver
├── tests/
│ ├── test_basic.c # Unit tests
│ ├── test_stress.c # Stress tests
│ └── test_shutdown.c # Shutdown tests
├── Makefile
└── README.md
Implementation Phases
Phase 1: Data Structures and Initialization (Day 1)
- Define all structures
- Implement
threadpool_create() - Implement basic queue operations (enqueue/dequeue)
- Start worker threads (just have them print and exit)
Phase 2: Worker Loop (Days 2-3)
- Implement the worker thread function
- Add proper waiting on condition variable
- Handle wakeup and task execution
- Test with simple tasks
Phase 3: Submit with Backpressure (Days 4-5)
- Implement
threadpool_submit() - Add blocking when queue is full
- Test with producers submitting faster than workers consume
Phase 4: Shutdown Choreography (Days 6-7)
- Implement graceful shutdown
- Implement immediate shutdown
- Implement
threadpool_wait() - Test shutdown under various conditions
Phase 5: Testing and Debugging (Days 8-10)
- Write comprehensive tests
- Run under helgrind/ThreadSanitizer
- Fix any race conditions found
- Performance testing
The Interview Questions They’ll Ask
- “What’s the difference between a mutex and a semaphore? When would you use each?”
- Expected answer: Mutex is for mutual exclusion (one thread at a time), semaphore is for counting resources. Use mutex for protecting shared data, semaphore for limiting concurrent access to N resources. A binary semaphore is similar to a mutex but has different ownership semantics (any thread can signal, only owner should unlock mutex).
- “Explain why this thread pool implementation might deadlock.” (They’ll show buggy code)
- Expected answer: Look for: lock ordering violations, missing unlock on error paths, waiting on condition while holding multiple locks, or joining a thread that’s waiting on a lock you hold.
- “How would you implement work stealing between thread pool workers?”
- Expected answer: Each worker has its own deque. Workers push/pop from their own deque (LIFO for cache locality). When empty, steal from the tail of another worker’s deque. Requires lock-free or fine-grained locking for the stealing operation.
- “What’s the spurious wakeup problem and how do you handle it?”
- Expected answer: Condition variable wait can return even when no signal was sent. Always wrap
pthread_cond_waitin a while loop that rechecks the condition, not an if statement.
- Expected answer: Condition variable wait can return even when no signal was sent. Always wrap
- “How do you choose the optimal number of threads for a thread pool?”
- Expected answer: For CPU-bound work: number of cores. For I/O-bound work: higher (2x-10x cores) depending on I/O wait ratio. Little’s Law can help: N = arrival_rate * average_service_time. In practice, benchmark and tune.
- “What’s the ABA problem and can it affect this implementation?”
- Expected answer: ABA occurs in lock-free structures when a value changes A->B->A between read and CAS. With mutex-protected queues, ABA isn’t an issue. But if you tried to make a lock-free queue, you’d need hazard pointers or epoch-based reclamation.
- “What’s the thundering herd problem and how does it apply to thread pools?”
- Expected answer: When using broadcast, all waiting threads wake up but only one can proceed. The others waste CPU checking the condition and going back to sleep. Solution: use signal for single-item availability, broadcast only for shutdown.
- “How do you handle a task that takes too long or hangs?”
- Expected answer: Options include: (1) task timeout with cancellation, (2) watchdog thread that monitors execution time, (3) worker thread termination and replacement, (4) application-level heartbeat in tasks. Thread cancellation in C is tricky due to cleanup handlers.
Hints in Layers
Layer 1 - Core Data Structures
typedef struct task {
void (*function)(void *arg);
void *arg;
struct task *next;
} task_t;
typedef struct threadpool {
pthread_mutex_t lock;
pthread_cond_t not_empty; // Signal when queue becomes non-empty
pthread_cond_t not_full; // Signal when queue has space (for bounded)
task_t *queue_head;
task_t *queue_tail;
int queue_size;
int queue_capacity;
pthread_t *workers;
int worker_count;
int shutdown; // 0 = running, 1 = graceful, 2 = immediate
} threadpool_t;
Layer 2 - Worker Thread Loop Pattern
void *worker_thread(void *arg) {
threadpool_t *pool = (threadpool_t *)arg;
while (1) {
pthread_mutex_lock(&pool->lock);
// Wait while queue is empty AND not shutting down
while (pool->queue_size == 0 && !pool->shutdown) {
pthread_cond_wait(&pool->not_empty, &pool->lock);
}
// Check shutdown AFTER waking
if (pool->shutdown && pool->queue_size == 0) {
pthread_mutex_unlock(&pool->lock);
break;
}
task_t *task = dequeue(pool); // Remove from queue
pthread_mutex_unlock(&pool->lock);
// Execute OUTSIDE the lock!
task->function(task->arg);
free(task);
}
return NULL;
}
Layer 3 - Submit with Backpressure
int threadpool_submit(threadpool_t *pool, void (*fn)(void*), void *arg) {
task_t *task = malloc(sizeof(task_t));
task->function = fn;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock(&pool->lock);
// Block if queue is full (backpressure)
while (pool->queue_size >= pool->queue_capacity && !pool->shutdown) {
pthread_cond_wait(&pool->not_full, &pool->lock);
}
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
free(task);
return -1; // Rejected
}
enqueue(pool, task);
pthread_cond_signal(&pool->not_empty); // Wake ONE worker
pthread_mutex_unlock(&pool->lock);
return 0;
}
Layer 4 - Graceful Shutdown
void threadpool_shutdown(threadpool_t *pool, int graceful) {
pthread_mutex_lock(&pool->lock);
pool->shutdown = graceful ? 1 : 2;
pthread_cond_broadcast(&pool->not_empty); // Wake ALL workers
pthread_cond_broadcast(&pool->not_full); // Unblock any blocked submitters
pthread_mutex_unlock(&pool->lock);
// Join all workers
for (int i = 0; i < pool->worker_count; i++) {
pthread_join(pool->workers[i], NULL);
}
// If immediate shutdown, drain remaining tasks
if (!graceful) {
while (pool->queue_size > 0) {
task_t *t = dequeue(pool);
free(t); // Or call a cancellation callback
}
}
}
Layer 5 - Testing for Correctness
// Test: No lost tasks under concurrent submit/shutdown
void stress_test() {
atomic_int completed = 0;
threadpool_t *pool = threadpool_create(4, 64);
// Submit from multiple producer threads simultaneously
pthread_t producers[8];
for (int i = 0; i < 8; i++) {
pthread_create(&producers[i], NULL, submit_1000_tasks, &completed);
}
// Wait a bit then request shutdown
usleep(100000);
threadpool_shutdown(pool, 1); // Graceful
for (int i = 0; i < 8; i++) {
pthread_join(producers[i], NULL);
}
// Verify: completed should equal submitted
assert(completed == 8000);
}
Testing Strategy
Unit Tests
// Test 1: Basic functionality
void test_single_task() {
int executed = 0;
threadpool_t *pool = threadpool_create(2, 10);
threadpool_submit(pool, increment_counter, &executed);
threadpool_wait(pool);
assert(executed == 1);
threadpool_shutdown(pool, 1);
threadpool_destroy(pool);
}
// Test 2: Queue bounds
void test_queue_bounds() {
threadpool_t *pool = threadpool_create(1, 5);
// Submit 5 tasks (should fill queue)
for (int i = 0; i < 5; i++) {
// Note: worker might process some before we fill
threadpool_submit(pool, slow_task, NULL);
}
// 6th task should block (test with timeout or separate thread)
// ...
threadpool_shutdown(pool, 1);
threadpool_destroy(pool);
}
// Test 3: Correct execution count
void test_execution_count() {
atomic_int counter = 0;
threadpool_t *pool = threadpool_create(4, 100);
for (int i = 0; i < 1000; i++) {
threadpool_submit(pool, atomic_increment, &counter);
}
threadpool_wait(pool);
assert(counter == 1000);
threadpool_shutdown(pool, 1);
threadpool_destroy(pool);
}
Stress Tests
// Stress test: Many producers, verify no lost tasks
void stress_test_many_producers() {
atomic_int completed = 0;
atomic_int submitted = 0;
threadpool_t *pool = threadpool_create(8, 256);
// 10 producer threads, each submitting 1000 tasks
pthread_t producers[10];
for (int i = 0; i < 10; i++) {
pthread_create(&producers[i], NULL, producer_fn,
&(producer_args_t){pool, 1000, &submitted, &completed});
}
// Wait for all producers to finish submitting
for (int i = 0; i < 10; i++) {
pthread_join(producers[i], NULL);
}
// Now wait for pool to complete all tasks
threadpool_wait(pool);
printf("Submitted: %d, Completed: %d\n", submitted, completed);
assert(submitted == completed);
threadpool_shutdown(pool, 1);
threadpool_destroy(pool);
}
// Stress test: Shutdown during heavy load
void stress_test_shutdown_under_load() {
atomic_int completed = 0;
threadpool_t *pool = threadpool_create(4, 64);
// Start submitting tasks continuously
pthread_t producer;
volatile int keep_submitting = 1;
pthread_create(&producer, NULL, continuous_producer,
&(continuous_args_t){pool, &keep_submitting, &completed});
// Let it run for a bit
usleep(100000); // 100ms
// Signal producer to stop and request shutdown
keep_submitting = 0;
threadpool_shutdown(pool, 1); // Graceful
pthread_join(producer, NULL);
printf("Tasks completed: %d\n", completed);
// All submitted tasks should have completed
threadpool_destroy(pool);
}
Race Condition Detection
# Compile with ThreadSanitizer
gcc -fsanitize=thread -g -O1 -o threadpool_tsan src/*.c -lpthread
# Run with ThreadSanitizer
./threadpool_tsan --stress-test
# Run with Helgrind (Valgrind)
valgrind --tool=helgrind ./threadpool --stress-test
# Run with DRD (another Valgrind tool)
valgrind --tool=drd ./threadpool --stress-test
Test Matrix
| Test Category | Tests |
|---|---|
| Basic | Create/destroy, single task, multiple tasks |
| Concurrency | Multiple workers, multiple producers |
| Backpressure | Full queue blocking, queue drain |
| Shutdown | Graceful with pending, immediate with pending |
| Edge cases | Zero workers, zero capacity, NULL function |
| Stress | 10K+ tasks, many producers, long duration |
| Race detection | ThreadSanitizer clean, Helgrind clean |
Common Pitfalls & Debugging
Bug 1: Forgetting to Recheck Condition After Waking
// WRONG - spurious wakeup breaks this
if (pool->queue_size == 0)
pthread_cond_wait(&pool->not_empty, &pool->lock);
task = dequeue(); // Might crash on empty queue!
// RIGHT - while loop handles spurious wakeups
while (pool->queue_size == 0 && !pool->shutdown)
pthread_cond_wait(&pool->not_empty, &pool->lock);
Symptoms: Occasional crashes, “impossible” empty queue dequeues
Debug: Add assertions: assert(pool->queue_size > 0); before dequeue
Bug 2: Executing Task While Holding the Lock
// WRONG - blocks all other workers during task execution!
pthread_mutex_lock(&pool->lock);
task = dequeue(pool);
task->function(task->arg); // Could take seconds!
pthread_mutex_unlock(&pool->lock);
// RIGHT - release lock before executing
pthread_mutex_lock(&pool->lock);
task = dequeue(pool);
pthread_mutex_unlock(&pool->lock);
task->function(task->arg); // Other workers can proceed
Symptoms: Only one task executes at a time despite multiple workers Debug: Add timing logs, observe serialized execution
Bug 3: Race Condition During Shutdown
// WRONG - worker might miss the shutdown signal
if (pool->shutdown) break; // Checked without lock!
pthread_cond_wait(...); // Might wait forever
// RIGHT - check with lock held, use broadcast for shutdown
pthread_mutex_lock(&pool->lock);
while (queue_empty && !pool->shutdown) {
pthread_cond_wait(...);
}
if (pool->shutdown && queue_empty) {
pthread_mutex_unlock(&pool->lock);
break;
}
Symptoms: Shutdown hangs, workers don’t terminate Debug: Add logging, use timeout on join, check for deadlocks
Bug 4: Memory Leak on Rejected Tasks During Shutdown
// WRONG - caller doesn't know task was rejected
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
return; // task memory leaked!
}
// RIGHT - return error code, let caller handle cleanup
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
free(task);
return -1; // ESHUTDOWN
}
Symptoms: Memory leaks under stress with shutdown Debug: Run under Valgrind, check leak summary
Bug 5: Signal Instead of Broadcast During Shutdown
// WRONG - only one worker wakes up!
pool->shutdown = 1;
pthread_cond_signal(&pool->not_empty); // Others stay blocked
// RIGHT - wake ALL workers for shutdown
pool->shutdown = 1;
pthread_cond_broadcast(&pool->not_empty); // All workers check and exit
Symptoms: Some workers don’t terminate on shutdown Debug: Log worker exits, count threads
Bug 6: Forgetting to Signal not_full After Dequeue
// WRONG - producers might block forever
pthread_mutex_lock(&pool->lock);
task = dequeue(pool);
pthread_mutex_unlock(&pool->lock);
// Forgot to signal not_full!
// RIGHT - signal that space is available
pthread_mutex_lock(&pool->lock);
task = dequeue(pool);
pthread_cond_signal(&pool->not_full); // Wake blocked producer
pthread_mutex_unlock(&pool->lock);
Symptoms: Producers hang when queue was full Debug: Add logging for queue depth, signal operations
Bug 7: Integer Overflow in Circular Buffer
// WRONG - can overflow if not using % correctly
pool->rear++;
if (pool->rear >= pool->capacity) pool->rear = 0;
// RIGHT - use modulo (works correctly)
pool->rear = (pool->rear + 1) % pool->capacity;
Symptoms: Array out of bounds access Debug: AddressSanitizer, bounds checking assertions
Bug 8: Use-After-Free of Task Structure
// WRONG - task might be freed before function returns
threadpool_submit(pool, fn, arg);
// If queue was not full, worker might execute immediately
// and free the task before this thread continues
// The task structure itself is fine, but be careful with arg:
void bad_usage() {
int local_var = 42;
threadpool_submit(pool, fn, &local_var);
// BUG: local_var goes out of scope, task uses dangling pointer!
}
// RIGHT - ensure arg lifetime exceeds task execution
void good_usage() {
int *heap_var = malloc(sizeof(int));
*heap_var = 42;
threadpool_submit(pool, fn_that_frees_arg, heap_var);
}
Symptoms: Corrupted data, crashes in task functions Debug: AddressSanitizer, careful lifetime analysis
Extensions & Challenges
Beginner Extensions
- Statistics collection: Track queue depth, worker utilization, task latency
- Logging system: Configurable debug logging with timestamps
- Non-blocking submit: Return immediately with error if queue full
- Task IDs: Assign unique IDs to tasks for tracking
Intermediate Extensions
- Priority queue: Tasks with priorities, higher priorities execute first
- Task cancellation: Ability to cancel pending (not yet started) tasks
- Dynamic resizing: Add/remove workers at runtime
- Timeout on submit: Block for at most N seconds, then return error
Advanced Extensions
- Work stealing: Per-worker queues with stealing from idle workers
- Deadline scheduling: Tasks with deadlines, prioritize by urgency
- Dependent tasks: Submit task that waits for another task’s completion
- Thread-local storage: Efficient per-worker state (connection pools, etc.)
Integration Projects
- HTTP server: Use thread pool to handle concurrent connections
- Image processor: Parallel image filtering/transformation
- Web crawler: Concurrent URL fetching with bounded parallelism
- Database connection pool: Bounded pool of database connections
Books That Will Help
| Book | Chapters | What You’ll Learn |
|---|---|---|
| CS:APP 3e | 12.4-12.5 | Threads, mutexes, condition variables, thread safety |
| OSTEP | 26-32 | Locks, condition variables, semaphores, common concurrency bugs |
| TLPI | 29-33 | POSIX threads, mutexes, conditions, thread cancellation |
| C++ Concurrency in Action | 2-4 | Modern patterns (applicable to C with adaptation) |
| APUE 3e | 11-12 | Threads, thread control, thread synchronization |
Real-World Connections
Production Thread Pools
Java ThreadPoolExecutor:
- Configurable core and max pool sizes
- Multiple rejection policies (abort, caller-runs, discard)
- Work stealing in ForkJoinPool
Go runtime scheduler:
- M:N threading model
- Work stealing between OS threads
- Goroutines are lightweight “tasks”
Python concurrent.futures:
- ThreadPoolExecutor and ProcessPoolExecutor
- Future-based task submission
- Automatic result collection
libuv (Node.js):
- Thread pool for file I/O and DNS
- Default 4 threads, configurable
- Integrates with event loop
Tokio (Rust):
- Async runtime with work-stealing scheduler
- Task-based concurrency (not thread-per-task)
- Very efficient for I/O-bound work
Where Thread Pools Are Used
- Web servers: Handling concurrent connections (Apache, nginx workers)
- Database engines: Query execution, background tasks
- Game engines: Physics, AI, rendering tasks
- Build systems: Parallel compilation (make -j)
- Scientific computing: Parallel simulations, data processing
Self-Assessment Checklist
Understanding
- I can explain the producer-consumer problem and its solutions
- I understand why
pthread_cond_waitmust be in a while loop - I can describe when to use signal vs broadcast
- I understand the atomicity guarantee of
pthread_cond_wait - I can explain why executing tasks while holding the lock is bad
- I understand how shutdown coordination works
Implementation
- Pool creates workers that wait correctly on condition variable
- Submit blocks when queue is full (backpressure)
- Workers dequeue and execute tasks correctly
- Graceful shutdown completes all queued tasks
- Immediate shutdown terminates without deadlock
- No race conditions (ThreadSanitizer clean)
- No memory leaks (Valgrind clean)
Testing
- Basic tests pass (create, submit, wait, destroy)
- Stress tests pass (many producers, many tasks)
- Shutdown tests pass (graceful and immediate)
- Race detection tools report no issues
- Long-running stress test (hours) is stable
Growth
- I debugged at least one concurrency bug
- I understand the performance trade-offs
- I can explain thread pool sizing strategies
- I can discuss this project in an interview context
Project Comparison: When to Use What
┌────────────────────────────────────────────────────────────────────────────────┐
│ CONCURRENCY APPROACHES COMPARISON │
├────────────────────────────────────────────────────────────────────────────────┤
│ │
│ Approach Best For Trade-offs │
│ ──────── ──────── ────────── │
│ │
│ Thread-per-request Simple, low load Unbounded resources │
│ │
│ Thread pool Medium load, mixed work Fixed overhead, bounded │
│ │
│ Event loop I/O-bound, high load Complex programming model │
│ (async/await) │
│ │
│ Work stealing CPU-bound, high load Complex implementation │
│ │
│ Actor model Distributed systems Higher abstraction cost │
│ (Erlang/Akka) │
│ │
│ Thread Pool + Async Production systems Combines benefits │
│ (e.g., Tokio) │
│ │
└────────────────────────────────────────────────────────────────────────────────┘
Summary
Building a thread pool forces you to confront the core challenges of concurrent programming:
- Synchronization: Protecting shared state with mutexes and coordinating with condition variables
- Producer-consumer: The fundamental pattern underlying most concurrent systems
- Resource management: Bounded resources, backpressure, clean shutdown
- Debugging: Finding and fixing race conditions, deadlocks, and lost wakeups
This project is a rite of passage for systems programmers. The concepts you learn here apply to every concurrent system you’ll ever build or debug.
What’s next?
- Integrate your pool with a network server (P20)
- Explore async I/O and event loops
- Study work-stealing schedulers (Go, Tokio, Rayon)
- Implement lock-free data structures
“The best way to learn concurrency is to get it wrong, debug it, and finally understand why the correct solution works.”