Project 8: Thread Pool Implementation

Build a reusable thread pool that accepts work items, distributes them to worker threads, and handles graceful shutdown—the foundation of every high-performance server.

Quick Reference

Attribute Value
Difficulty Level 4 - Expert
Time Estimate 2-3 weeks (30-50 hours)
Language C (primary), Rust/Go/C++ (alternatives)
Prerequisites Project 7 (Producer-Consumer), POSIX threads, mutex/condvar
Key Topics Thread control, synchronization, resource management, API design

1. Learning Objectives

After completing this project, you will be able to:

  • Design thread-safe APIs that can be safely called from multiple threads without external synchronization
  • Implement work queue management using mutexes and condition variables to distribute tasks efficiently
  • Handle thread lifecycle management including creation, work distribution, and graceful termination
  • Understand thread attributes such as stack size, detached vs joinable threads, and thread-specific data
  • Implement graceful shutdown patterns that complete pending work before termination
  • Build reusable infrastructure code that can serve as the foundation for servers and parallel applications
  • Debug multi-threaded code using helgrind, ThreadSanitizer, and systematic testing

2. Theoretical Foundation

2.1 Core Concepts

A thread pool is a pattern for managing a collection of pre-created worker threads that wait for tasks to execute. Instead of creating a new thread for each task (expensive), you reuse existing threads (efficient).

Thread Pool Architecture

                    ┌─────────────────────────────────────────────────────────────┐
                    │                      THREAD POOL                            │
                    │                                                             │
   Client Thread    │   Work Queue              Worker Threads                    │
        │           │                                                             │
        │ submit()  │  ┌─────────────────┐     ┌────────────────┐                │
        │──────────────►│ Task 1        │────►│ Worker 0       │                │
        │           │  │ Task 2        │     │   (executing)   │                │
        │           │  │ Task 3        │     └────────────────┘                │
        │           │  │ ...           │                                         │
        │           │  └─────────────────┘     ┌────────────────┐                │
        │           │        │                 │ Worker 1       │                │
        │           │        │ dequeue()       │   (waiting)    │◄───────────┐   │
        │           │        │                 └────────────────┘            │   │
        │           │        ▼                                               │   │
        │           │  ┌─────────────────┐     ┌────────────────┐            │   │
        │           │  │ Mutex + Condvar │────►│ Worker 2       │            │   │
        │           │  └─────────────────┘     │   (executing)  │            │   │
        │           │                          └────────────────┘            │   │
        │           │                                                        │   │
        │           │                          ┌────────────────┐            │   │
        │           │                          │ Worker 3       │────────────┘   │
        │           │                          │   (waiting)    │                │
        │           │                          └────────────────┘                │
        │           │                                                             │
        │           └─────────────────────────────────────────────────────────────┘
        │
        ▼
   Continue execution
   (non-blocking submit)

Why Thread Pools Exist:

Thread creation is expensive. On Linux, pthread_create() typically takes 1-10 milliseconds and allocates 1-8 MB of stack space per thread. For a web server handling 10,000 requests/second, creating a thread per request would:

  1. Spend 10-100 seconds/second just creating threads (impossible!)
  2. Use 10-80 GB of stack memory
  3. Cause extreme context switching overhead

Thread pools solve this by:

  • Pre-creating a fixed number of threads (amortizing creation cost)
  • Reusing threads across many tasks (no per-task creation overhead)
  • Limiting concurrency (preventing resource exhaustion)
  • Providing a clean API for task submission

2.2 Why This Matters

Real-World Usage:

Thread pools are the foundation of virtually every high-performance server and parallel application:

  • nginx uses a thread pool for file I/O operations (to avoid blocking the event loop)
  • PostgreSQL uses a process pool (similar concept) for handling client connections
  • Java’s ExecutorService is the standard way to manage concurrent tasks
  • Python’s concurrent.futures.ThreadPoolExecutor powers parallel task execution
  • libuv (Node.js’s I/O library) uses thread pools for file system operations
  • Database connection pools follow the same pattern for managing database connections

Career Impact:

Understanding thread pools is essential for:

  • Designing high-performance backend systems
  • Building scalable web servers
  • Implementing parallel data processing
  • Optimizing I/O-bound and CPU-bound workloads
  • Writing efficient game engines and simulations

Performance Numbers:

A well-implemented thread pool can:

  • Handle 100,000+ tasks/second on commodity hardware
  • Reduce latency by 10-100x compared to thread-per-task
  • Use 10-1000x less memory than thread-per-task

2.3 Historical Context

Thread pools emerged as a solution to the limitations of early concurrent programming models:

1990s - Thread-Per-Request Era: Early web servers (Apache 1.x, early IIS) created a new process or thread for each request. This was simple but didn’t scale beyond a few hundred concurrent connections.

Early 2000s - Thread Pool Adoption: Java 1.5 (2004) introduced java.util.concurrent with ThreadPoolExecutor, standardizing the pattern. Apache Tomcat, JBoss, and other servers adopted thread pools.

2010s - Event Loop + Thread Pool Hybrid: Modern servers (nginx, Node.js) combine event loops for I/O with thread pools for blocking operations. This hybrid approach handles millions of concurrent connections.

The Pattern Endures: Despite advances in async/await and coroutines, thread pools remain fundamental. Even Rust’s tokio runtime uses thread pools internally for CPU-bound work.

2.4 Common Misconceptions

Misconception 1: “More threads = faster”

Reality: Adding threads beyond the number of CPU cores for CPU-bound work adds overhead without improving throughput. The optimal thread count depends on the workload:

  • CPU-bound: threads = number of cores
  • I/O-bound: threads = number of cores * (1 + wait_time/compute_time)

Misconception 2: “Thread pools eliminate all synchronization needs”

Reality: Thread pools handle task distribution, but your tasks still need proper synchronization if they access shared state. The pool protects its internal state, not your data.

Misconception 3: “Graceful shutdown is optional”

Reality: Abrupt termination can leave resources in inconsistent states, leak memory, or corrupt data. Proper shutdown handling is essential for production systems.

Misconception 4: “Thread pools are only for servers”

Reality: Thread pools are useful for any parallel workload: image processing, data analysis, simulation, testing, and more.


3. Project Specification

3.1 What You Will Build

A reusable thread pool library with the following components:

  1. Public API (threadpool.h):
    • threadpool_create(num_threads) - Create a new thread pool
    • threadpool_submit(pool, function, arg) - Submit a task for execution
    • threadpool_wait(pool) - Wait for all pending tasks to complete
    • threadpool_destroy(pool, graceful) - Shutdown the pool
  2. Internal Implementation (threadpool.c):
    • Work queue with thread-safe enqueue/dequeue
    • Worker thread management
    • Graceful shutdown coordination
  3. Demo Application (threadpool_demo.c):
    • Shows usage patterns
    • Benchmarks performance
    • Tests edge cases

3.2 Functional Requirements

  1. Pool Creation
    • Accept number of worker threads as parameter
    • Spawn all worker threads on creation
    • Return opaque pool handle to caller
  2. Task Submission
    • Accept function pointer and void* argument
    • Thread-safe: callable from any thread
    • Non-blocking: return immediately after queuing
    • Return success/failure indication
  3. Task Execution
    • FIFO ordering (tasks execute in submission order)
    • Each task runs on exactly one worker thread
    • No task is lost or executed multiple times
  4. Graceful Shutdown
    • Complete all pending tasks before exit (when graceful=1)
    • Immediate stop: abandon pending tasks (when graceful=0)
    • Wait for all workers to exit before returning
  5. Wait Operation
    • Block until work queue is empty
    • All submitted tasks completed
    • New tasks can still be submitted during wait

3.3 Non-Functional Requirements

  1. Performance
    • Task submission: < 1 microsecond average latency
    • Worker wakeup: < 10 microseconds from submission
    • Throughput: > 100,000 tasks/second with 4 workers
  2. Memory Safety
    • Zero memory leaks (verified by valgrind)
    • No use-after-free or double-free
    • Bounded memory usage (queue size configurable)
  3. Thread Safety
    • All public functions are thread-safe
    • No data races (verified by helgrind/tsan)
    • No deadlocks under any usage pattern
  4. Error Handling
    • Return error codes for all failure cases
    • Don’t crash on invalid input
    • Clean up properly on initialization failure

3.4 Example Usage / Output

# 1. Basic thread pool demo
$ ./threadpool_demo
Creating thread pool with 4 workers...
Pool created. Submitting 20 tasks...

[Worker 0] Processing task 1: computing...
[Worker 1] Processing task 2: computing...
[Worker 2] Processing task 3: computing...
[Worker 3] Processing task 4: computing...
[Worker 0] Task 1 complete (result: 42)
[Worker 0] Processing task 5: computing...
...
[Worker 2] Task 20 complete (result: 840)

All 20 tasks completed.
Destroying pool...
Pool destroyed. Workers joined.

# 2. Performance benchmark
$ ./threadpool_bench -w 8 -t 10000
Thread pool benchmark
Workers: 8
Tasks: 10000 (each simulates 1ms of work)

Single-threaded baseline: 10.02 seconds
Thread pool (8 workers): 1.28 seconds
Speedup: 7.83x

# 3. Graceful shutdown test
$ ./threadpool_demo --shutdown-test
Submitting 100 slow tasks (100ms each)...
After 500ms, requesting shutdown...

[Worker 0] Processing task 1...
[Worker 1] Processing task 2...
...
Shutdown requested. Completing pending tasks...
[Worker 3] Task 8 complete (last pending)

Tasks completed: 8
Tasks cancelled: 92
Shutdown complete.

# 4. Use as a library
$ cat my_app.c
#include "threadpool.h"

void my_task(void *arg) {
    printf("Processing: %s\n", (char *)arg);
}

int main() {
    threadpool_t *pool = threadpool_create(4);
    threadpool_submit(pool, my_task, "task 1");
    threadpool_submit(pool, my_task, "task 2");
    threadpool_destroy(pool, 1);  // 1 = wait for completion
    return 0;
}

$ gcc -o my_app my_app.c threadpool.c -lpthread
$ ./my_app
Processing: task 1
Processing: task 2

3.5 Real World Outcome

When your thread pool is complete, you will have:

  1. A working library that you can use in other projects
  2. Performance metrics showing speedup over single-threaded execution
  3. Verification output from valgrind showing zero memory leaks
  4. Clean API documentation in the header file

You’ll be able to run:

$ valgrind --leak-check=full ./threadpool_demo
...
All heap blocks were freed -- no leaks are possible

$ valgrind --tool=helgrind ./threadpool_demo
...
ERROR SUMMARY: 0 errors from 0 contexts

4. Solution Architecture

4.1 High-Level Design

Thread Pool Components

┌─────────────────────────────────────────────────────────────────────────────┐
│                           threadpool_t                                      │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                         Configuration                                │   │
│  │  num_threads: 4                                                     │   │
│  │  queue_capacity: 1024                                               │   │
│  │  shutdown: false                                                     │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                         Work Queue                                   │   │
│  │  ┌──────┬──────┬──────┬──────┬──────┬──────┬──────┬──────┐         │   │
│  │  │ T1   │ T2   │ T3   │ T4   │      │      │      │      │         │   │
│  │  └──────┴──────┴──────┴──────┴──────┴──────┴──────┴──────┘         │   │
│  │     ↑                    ↑                                          │   │
│  │   head                 tail                                         │   │
│  │   (dequeue)            (enqueue)                                    │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                      Synchronization                                 │   │
│  │  mutex: protects all shared state                                   │   │
│  │  work_available: signaled when new task added                       │   │
│  │  work_done: signaled when task completed                            │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                       Worker Threads                                 │   │
│  │  threads[0]: pthread_t (running task T5)                            │   │
│  │  threads[1]: pthread_t (waiting on work_available)                  │   │
│  │  threads[2]: pthread_t (running task T6)                            │   │
│  │  threads[3]: pthread_t (waiting on work_available)                  │   │
│  └─────────────────────────────────────────────────────────────────────┘   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Data Flow:

    submit()                              worker()
       │                                     │
       ▼                                     ▼
   ┌───────┐     ┌───────────────────┐   ┌───────┐
   │ Lock  │────►│ Enqueue task      │   │ Lock  │
   │ mutex │     │ Signal workers    │   │ mutex │
   └───────┘     │ Unlock            │   └───────┘
                 └───────────────────┘       │
                                             ▼
                                     ┌───────────────────┐
                                     │ Wait if empty     │
                                     │ Dequeue task      │
                                     │ Unlock            │
                                     │ Execute task      │
                                     │ Signal completion │
                                     └───────────────────┘

4.2 Key Components

1. Task Structure Encapsulates a unit of work: function pointer + argument

2. Work Queue Circular buffer holding pending tasks. Fixed size to bound memory usage.

3. Synchronization

  • One mutex protecting all shared state
  • Condition variable for “work available” (workers wait on this)
  • Optional condition variable for “work done” (for threadpool_wait)

4. Worker Thread Function Infinite loop: wait for work, dequeue, execute, repeat. Exit when shutdown signaled.

5. Lifecycle Management

  • Create: allocate structures, spawn threads
  • Submit: enqueue work, signal workers
  • Wait: block until queue empty and all workers idle
  • Destroy: signal shutdown, join threads, free resources

4.3 Data Structures

// Task: a unit of work
typedef struct {
    void (*function)(void *);  // Function to execute
    void *arg;                 // Argument to pass to function
} task_t;

// Thread pool structure
typedef struct {
    // Worker threads
    pthread_t *threads;        // Array of worker thread handles
    int num_threads;           // Number of worker threads

    // Work queue (circular buffer)
    task_t *queue;             // Array of tasks
    int queue_capacity;        // Maximum queue size
    int queue_head;            // Index of next task to dequeue
    int queue_tail;            // Index where next task will be enqueued
    int queue_count;           // Current number of tasks in queue

    // Synchronization
    pthread_mutex_t mutex;           // Protects all shared state
    pthread_cond_t work_available;   // Signaled when task added
    pthread_cond_t work_done;        // Signaled when task completed

    // State
    int shutdown;              // 0 = running, 1 = graceful shutdown, 2 = immediate
    int active_tasks;          // Number of tasks currently being executed
} threadpool_t;

4.4 Algorithm Overview

Worker Thread Pseudocode:

worker_function(pool):
    loop forever:
        lock(mutex)

        # Wait for work or shutdown
        while queue_empty AND NOT shutdown:
            cond_wait(work_available, mutex)

        # Check for shutdown
        if shutdown AND queue_empty:
            unlock(mutex)
            return  # Exit thread

        # Dequeue a task
        task = queue[head]
        head = (head + 1) % capacity
        count--
        active_tasks++

        unlock(mutex)

        # Execute task (outside lock!)
        task.function(task.arg)

        # Mark task complete
        lock(mutex)
        active_tasks--
        cond_signal(work_done)  # Wake up anyone waiting on completion
        unlock(mutex)

Submit Pseudocode:

threadpool_submit(pool, function, arg):
    lock(mutex)

    # Check for shutdown or full queue
    if shutdown OR count == capacity:
        unlock(mutex)
        return ERROR

    # Enqueue the task
    queue[tail] = {function, arg}
    tail = (tail + 1) % capacity
    count++

    # Wake up a worker
    cond_signal(work_available)

    unlock(mutex)
    return SUCCESS

Graceful Shutdown Pseudocode:

threadpool_destroy(pool, graceful):
    lock(mutex)

    if graceful:
        shutdown = 1  # Workers will finish queue first
    else:
        shutdown = 2  # Workers will exit immediately

    # Wake up all waiting workers
    cond_broadcast(work_available)

    unlock(mutex)

    # Wait for all workers to exit
    for each thread:
        pthread_join(thread)

    # Free resources
    free(queue)
    free(threads)
    destroy(mutex)
    destroy(work_available)
    destroy(work_done)
    free(pool)

5. Implementation Guide

5.1 Development Environment Setup

# Install required tools
$ sudo apt-get install build-essential valgrind gdb

# Create project structure
$ mkdir threadpool && cd threadpool
$ touch threadpool.h threadpool.c threadpool_demo.c Makefile

# Makefile content
$ cat > Makefile << 'EOF'
CC = gcc
CFLAGS = -Wall -Wextra -Werror -g -O2 -pthread
LDFLAGS = -pthread

all: threadpool_demo

threadpool_demo: threadpool_demo.c threadpool.c threadpool.h
	$(CC) $(CFLAGS) -o $@ threadpool_demo.c threadpool.c $(LDFLAGS)

clean:
	rm -f threadpool_demo *.o

test: threadpool_demo
	./threadpool_demo

valgrind: threadpool_demo
	valgrind --leak-check=full --show-leak-kinds=all ./threadpool_demo

helgrind: threadpool_demo
	valgrind --tool=helgrind ./threadpool_demo

.PHONY: all clean test valgrind helgrind
EOF

5.2 Project Structure

threadpool/
├── threadpool.h        # Public API header
├── threadpool.c        # Implementation
├── threadpool_demo.c   # Demo and tests
├── threadpool_bench.c  # Performance benchmarks (optional)
├── Makefile
└── README.md

5.3 The Core Question You’re Answering

“How do you efficiently reuse threads to handle arbitrary work items while providing a clean API for clients?”

Thread creation is expensive (1-10ms). A thread pool amortizes this cost over many tasks. This pattern underlies nginx, Apache, database connection pools, and virtually every server.

Think about:

  • How to wake up exactly one waiting worker when work arrives
  • How to ensure no task is lost or executed twice
  • How to cleanly shut down when some workers are blocked waiting
  • How to handle the case where tasks arrive faster than they can be processed

5.4 Concepts You Must Understand First

Before coding, ensure you can answer these questions:

1. Thread Attributes

  • What is pthread_attr_t and what can you configure with it?
  • What’s the difference between a detached and joinable thread?
  • When would you need to set a custom stack size?
  • Book Reference: “APUE” Ch. 12.3

2. Thread Cancellation

  • What does pthread_cancel() do?
  • What are cancellation points?
  • What are cleanup handlers and why are they needed?
  • Book Reference: “APUE” Ch. 12.7

3. Thread-Specific Data

  • What is pthread_key_create() used for?
  • When would a thread pool need thread-local storage?
  • Book Reference: “APUE” Ch. 12.6

4. Condition Variable Semantics

  • Why must you hold the mutex when calling pthread_cond_wait()?
  • Why use while (condition) instead of if (condition)?
  • What’s the difference between signal() and broadcast()?

5.5 Questions to Guide Your Design

API Design:

  • Should threadpool_submit() block if the queue is full, or return an error?
  • Should there be a way for the caller to know when a specific task completes?
  • What happens if threadpool_submit() is called after threadpool_destroy()?

Shutdown Semantics:

  • Should graceful shutdown wait for pending tasks in the queue?
  • How should workers respond to shutdown while blocked on cond_wait?
  • What if a task takes forever to complete during shutdown?

Error Handling:

  • What if pthread_create() fails for some workers but not others?
  • What if malloc() fails during submission?
  • Should worker threads catch signals, or let them propagate?

5.6 Thinking Exercise

Thread Pool State Machine

Model the worker thread state machine:

                    ┌──────────────┐
                    │   Created    │
                    └──────┬───────┘
                           │ pthread_create()
                           │ calls worker_function()
                           ▼
              ┌────────────────────────┐
              │                        │
              │         Idle           │◄────────────┐
              │   (waiting for work)   │             │
              │   blocked on cond_wait │             │
              │                        │             │
              └────────────┬───────────┘             │
                           │ task available          │
                           │ (signaled by submit)    │
                           ▼                         │
              ┌────────────────────────┐             │
              │                        │             │
              │       Executing        │─────────────┘
              │    (running task)      │   task completed
              │   mutex NOT held       │   (signal work_done)
              │                        │
              └────────────┬───────────┘
                           │ shutdown signaled
                           │ AND queue empty
                           ▼
              ┌────────────────────────┐
              │                        │
              │      Terminating       │
              │  (returns from func)   │
              │   pthread_exit()       │
              │                        │
              └────────────────────────┘

Questions to consider:
- What happens if shutdown is signaled while executing a task?
- How does the transition from Idle to Executing work atomically?
- What wakes up idle threads?
- What if multiple threads are woken but only one task is available?

5.7 Hints in Layers

Hint 1: Work Item Structure

typedef struct {
    void (*function)(void *);
    void *arg;
} task_t;

This is all you need. The function pointer is the work to do, arg is the data.

Hint 2: Pool Structure

typedef struct {
    pthread_t *threads;
    int num_threads;
    task_t *queue;
    int queue_size;
    int queue_head, queue_tail, queue_count;
    pthread_mutex_t mutex;
    pthread_cond_t work_available;
    pthread_cond_t work_done;
    int shutdown;
    int active_tasks;
} threadpool_t;

Hint 3: Worker Function Pattern

void *worker(void *arg) {
    threadpool_t *pool = (threadpool_t *)arg;
    while (1) {
        pthread_mutex_lock(&pool->mutex);

        while (pool->queue_count == 0 && !pool->shutdown) {
            pthread_cond_wait(&pool->work_available, &pool->mutex);
        }

        if (pool->shutdown && pool->queue_count == 0) {
            pthread_mutex_unlock(&pool->mutex);
            return NULL;
        }

        // Dequeue task
        task_t task = pool->queue[pool->queue_head];
        pool->queue_head = (pool->queue_head + 1) % pool->queue_size;
        pool->queue_count--;
        pool->active_tasks++;

        pthread_mutex_unlock(&pool->mutex);

        // Execute task OUTSIDE the lock
        task.function(task.arg);

        pthread_mutex_lock(&pool->mutex);
        pool->active_tasks--;
        pthread_cond_signal(&pool->work_done);
        pthread_mutex_unlock(&pool->mutex);
    }
}

Hint 4: Graceful Shutdown Set shutdown flag, broadcast to all workers (so they all wake up and see the flag), then join all threads. Workers will finish their current task and drain the queue before exiting if shutdown == 1.

5.8 The Interview Questions They’ll Ask

  1. “How would you size a thread pool for CPU-bound vs I/O-bound work?”
    • CPU-bound: number of cores (more threads just adds context switch overhead)
    • I/O-bound: can be much larger, since threads spend time waiting
    • Formula: threads = cores * (1 + wait_time/compute_time)
  2. “What happens if a worker thread crashes?”
    • Other workers continue, but pool is now smaller
    • Depends on crash type: segfault terminates process, uncaught exception in C++ might just terminate thread
    • Production pools often have monitoring and restart capabilities
  3. “How do you handle work stealing in a thread pool?”
    • Each worker has its own local queue
    • When local queue empty, steal from others
    • Used in fork/join frameworks for better cache locality
  4. “What’s the difference between a thread pool and an executor?”
    • Thread pool is the mechanism (reused threads)
    • Executor is the abstraction (submit work, get results)
    • Java’s ExecutorService wraps ThreadPoolExecutor
  5. “How would you add priority to the work queue?”
    • Replace FIFO queue with priority queue (heap)
    • Higher priority tasks dequeued first
    • Careful of starvation of low-priority tasks

5.9 Books That Will Help

Topic Book Chapter
Thread control “APUE” by Stevens & Rago Ch. 12
Thread attributes “The Linux Programming Interface” by Kerrisk Ch. 29-30
Real-world patterns “C++ Concurrency in Action” by Williams Ch. 9
Advanced sync “Rust Atomics and Locks” by Mara Bos Ch. 1-5
Design patterns “Java Concurrency in Practice” by Goetz Ch. 6-8

5.10 Implementation Phases

Phase 1: Basic Structure (2-3 hours)

  • Define data structures in header file
  • Implement threadpool_create() without starting threads
  • Implement threadpool_destroy() cleanup
  • Test memory allocation/deallocation with valgrind

Phase 2: Work Queue (2-3 hours)

  • Implement circular buffer operations (enqueue, dequeue)
  • Test queue operations in isolation
  • Handle full/empty conditions

Phase 3: Worker Threads (3-4 hours)

  • Implement worker function with wait loop
  • Start threads in threadpool_create()
  • Test that workers start and block correctly
  • Add simple logging to verify behavior

Phase 4: Task Submission (2-3 hours)

  • Implement threadpool_submit()
  • Wake workers on submission
  • Test with simple tasks
  • Verify tasks execute on different threads

Phase 5: Shutdown (3-4 hours)

  • Implement graceful shutdown (wait for pending)
  • Implement immediate shutdown (abandon pending)
  • Test shutdown with pending tasks
  • Verify no deadlocks

Phase 6: Wait Operation (1-2 hours)

  • Implement threadpool_wait()
  • Track active tasks
  • Signal when all work complete
  • Test concurrent submit and wait

Phase 7: Testing & Polish (3-4 hours)

  • Run valgrind for memory leaks
  • Run helgrind for data races
  • Add error handling for all failure cases
  • Write comprehensive tests
  • Benchmark performance

5.11 Key Implementation Decisions

Decision 1: Fixed vs Dynamic Queue Size

  • Fixed: simpler, bounded memory, submit can fail when full
  • Dynamic: more complex, unbounded memory, submit always succeeds
  • Recommendation: Start with fixed, add dynamic as extension

Decision 2: Signal vs Broadcast on Submit

  • Signal: wakes one worker (more efficient)
  • Broadcast: wakes all workers (safer but wasteful)
  • Recommendation: Use signal for submit, broadcast for shutdown

Decision 3: Blocking vs Non-blocking Submit

  • Blocking: simpler for caller, but can cause deadlock
  • Non-blocking: caller must handle failure
  • Recommendation: Non-blocking with error return; add blocking variant as option

Decision 4: Task Result Handling

  • Simple: fire-and-forget (no result)
  • Advanced: futures/promises (caller can wait for result)
  • Recommendation: Start simple, add futures as extension

6. Testing Strategy

6.1 Unit Tests

Test each component in isolation:

// Test queue operations
void test_queue_enqueue_dequeue() {
    threadpool_t pool;
    init_queue(&pool, 10);

    task_t task = {dummy_func, NULL};
    assert(enqueue(&pool, task) == 0);
    assert(pool.queue_count == 1);

    task_t result = dequeue(&pool);
    assert(pool.queue_count == 0);
    assert(result.function == dummy_func);
}

// Test queue full condition
void test_queue_full() {
    threadpool_t pool;
    init_queue(&pool, 2);

    task_t task = {dummy_func, NULL};
    assert(enqueue(&pool, task) == 0);
    assert(enqueue(&pool, task) == 0);
    assert(enqueue(&pool, task) == -1);  // Should fail
}

// Test queue wrap-around
void test_queue_wraparound() {
    threadpool_t pool;
    init_queue(&pool, 4);

    // Fill and drain queue multiple times
    for (int i = 0; i < 10; i++) {
        task_t task = {dummy_func, (void*)(long)i};
        assert(enqueue(&pool, task) == 0);
        task_t result = dequeue(&pool);
        assert(result.arg == (void*)(long)i);
    }
}

6.2 Integration Tests

Test the complete system:

// Test basic execution
void test_basic_execution() {
    threadpool_t *pool = threadpool_create(2);
    atomic_int counter = 0;

    for (int i = 0; i < 100; i++) {
        threadpool_submit(pool, increment_counter, &counter);
    }

    threadpool_destroy(pool, 1);  // Wait for completion
    assert(counter == 100);
}

// Test concurrent submission
void test_concurrent_submit() {
    threadpool_t *pool = threadpool_create(4);

    // Launch 4 submitter threads
    pthread_t submitters[4];
    for (int i = 0; i < 4; i++) {
        pthread_create(&submitters[i], NULL, submit_many_tasks, pool);
    }

    // Wait for submitters
    for (int i = 0; i < 4; i++) {
        pthread_join(submitters[i], NULL);
    }

    threadpool_destroy(pool, 1);
    // Verify all tasks completed
}

// Test graceful shutdown
void test_graceful_shutdown() {
    threadpool_t *pool = threadpool_create(2);
    atomic_int completed = 0;

    // Submit slow tasks
    for (int i = 0; i < 10; i++) {
        threadpool_submit(pool, slow_task, &completed);
    }

    // Immediate shutdown request (before all complete)
    usleep(50000);  // Let a few start
    threadpool_destroy(pool, 1);  // Graceful

    assert(completed == 10);  // All should complete
}

6.3 Edge Cases to Test

  1. Empty pool usage: Create pool, destroy without submitting
  2. Single thread: Pool with 1 worker
  3. More tasks than workers: 100 tasks, 4 workers
  4. Rapid submit/destroy: Submit, immediately destroy
  5. Double destroy: Verify second destroy is safe (or errors gracefully)
  6. NULL function pointer: Should be rejected
  7. Submit during shutdown: Should fail cleanly
  8. Very long tasks: Ensure shutdown waits appropriately
  9. Memory pressure: Submit until queue is full
  10. Signal handling: SIGINT during execution

6.4 Verification Commands

# Memory leak detection
$ valgrind --leak-check=full --show-leak-kinds=all ./threadpool_demo
==12345== HEAP SUMMARY:
==12345==     in use at exit: 0 bytes in 0 blocks
==12345==     total heap usage: 50 allocs, 50 frees, 10,240 bytes allocated
==12345== All heap blocks were freed -- no leaks are possible

# Thread race detection
$ valgrind --tool=helgrind ./threadpool_demo
==12345== ERROR SUMMARY: 0 errors from 0 contexts

# Alternative: ThreadSanitizer (faster than helgrind)
$ gcc -fsanitize=thread -g -O2 -o threadpool_demo threadpool_demo.c threadpool.c -pthread
$ ./threadpool_demo
# Should show no warnings

# Stress test
$ for i in {1..100}; do ./threadpool_demo || echo "FAILED at iteration $i"; done

# Performance measurement
$ time ./threadpool_bench -w 8 -t 100000

7. Common Pitfalls & Debugging

Problem 1: “Threads don’t exit on shutdown”

Symptom: pthread_join() hangs forever

Why: Workers are blocked on pthread_cond_wait(), never see shutdown flag

Fix: After setting shutdown flag, call pthread_cond_broadcast() to wake ALL waiting workers

void threadpool_destroy(threadpool_t *pool, int graceful) {
    pthread_mutex_lock(&pool->mutex);
    pool->shutdown = graceful ? 1 : 2;
    pthread_cond_broadcast(&pool->work_available);  // CRITICAL!
    pthread_mutex_unlock(&pool->mutex);

    for (int i = 0; i < pool->num_threads; i++) {
        pthread_join(pool->threads[i], NULL);
    }
    // ... cleanup
}

Quick Test: Add printf in worker before/after cond_wait to verify wakeup

Problem 2: “Tasks execute multiple times”

Symptom: Counter incremented more than expected

Why: Didn’t remove task from queue atomically while holding lock

Fix: Dequeue must happen BEFORE unlocking mutex

// WRONG
pthread_mutex_unlock(&pool->mutex);
task = pool->queue[pool->queue_head];  // Race condition!
// Another thread could also read this task

// CORRECT
task = pool->queue[pool->queue_head];
pool->queue_head = (pool->queue_head + 1) % pool->queue_size;
pool->queue_count--;
pthread_mutex_unlock(&pool->mutex);
// Task is now exclusively ours

Problem 3: “Pool crashes under load”

Symptom: Segfault or assertion failure with many tasks

Why: Queue overflow (enqueuing when full) or null pointer dereference

Fix: Check queue bounds before enqueue, validate all pointers

int threadpool_submit(threadpool_t *pool, void (*function)(void *), void *arg) {
    if (pool == NULL || function == NULL) {
        return -1;  // Invalid arguments
    }

    pthread_mutex_lock(&pool->mutex);

    if (pool->shutdown || pool->queue_count == pool->queue_capacity) {
        pthread_mutex_unlock(&pool->mutex);
        return -1;  // Cannot accept task
    }

    // Safe to enqueue
    pool->queue[pool->queue_tail].function = function;
    pool->queue[pool->queue_tail].arg = arg;
    pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
    pool->queue_count++;

    pthread_cond_signal(&pool->work_available);
    pthread_mutex_unlock(&pool->mutex);
    return 0;
}

Problem 4: “Spurious task execution”

Symptom: Worker executes task when queue should be empty

Why: Using if instead of while for condition check

Fix: Always use while loop around cond_wait

// WRONG - spurious wakeup can cause empty queue access
if (pool->queue_count == 0 && !pool->shutdown) {
    pthread_cond_wait(&pool->work_available, &pool->mutex);
}
// queue might still be empty!

// CORRECT - re-check condition after every wakeup
while (pool->queue_count == 0 && !pool->shutdown) {
    pthread_cond_wait(&pool->work_available, &pool->mutex);
}
// queue is guaranteed non-empty (or shutdown)

Problem 5: “Memory leak on partial initialization failure”

Symptom: Valgrind reports leaks when pthread_create fails

Why: Not cleaning up already-created threads when one fails

Fix: Track how many threads were created, join them on failure

threadpool_t *threadpool_create(int num_threads) {
    threadpool_t *pool = malloc(sizeof(threadpool_t));
    if (!pool) return NULL;

    pool->threads = malloc(num_threads * sizeof(pthread_t));
    if (!pool->threads) {
        free(pool);
        return NULL;
    }

    pool->num_threads = 0;  // Track successfully created

    for (int i = 0; i < num_threads; i++) {
        if (pthread_create(&pool->threads[i], NULL, worker, pool) != 0) {
            // Failed! Clean up threads already created
            pool->shutdown = 2;  // Immediate shutdown
            pthread_cond_broadcast(&pool->work_available);
            for (int j = 0; j < pool->num_threads; j++) {
                pthread_join(pool->threads[j], NULL);
            }
            free(pool->threads);
            free(pool);
            return NULL;
        }
        pool->num_threads++;
    }

    return pool;
}

8. Extensions & Challenges

8.1 Easy Extensions

  1. Task Counting: Add counters for submitted, completed, and failed tasks
  2. Queue Size Query: Function to return current queue length
  3. Worker Identification: Each worker knows its index (for logging)
  4. Task Timeouts: Optionally cancel tasks that take too long

8.2 Advanced Challenges

  1. Work Stealing: Each worker has local queue, steals from others when empty
  2. Dynamic Sizing: Grow/shrink thread count based on load
  3. Priority Queue: Higher priority tasks execute first
  4. Futures/Promises: Return a handle to wait for specific task completion
  5. Task Chaining: Submit tasks that depend on other tasks
  6. Affinity: Pin specific tasks to specific workers

8.3 Research Topics

  1. Lock-Free Queues: Can you eliminate the mutex using atomic operations?
  2. NUMA Awareness: How should thread pools behave on multi-socket systems?
  3. Coroutine Integration: How do thread pools work with async/await patterns?
  4. Fair Scheduling: Ensure no submitter starves the pool

9. Real-World Connections

9.1 Production Systems Using This

  1. libuv (Node.js I/O): Thread pool for file system operations
  2. nginx: Thread pool for blocking operations (sendfile, logging)
  3. Apache HTTPD: Worker MPM uses thread pool per child process
  4. PostgreSQL: Background writer, autovacuum use worker threads
  5. Java ExecutorService: The standard thread pool abstraction
  6. Python ThreadPoolExecutor: concurrent.futures module
  7. Rust Rayon: Work-stealing thread pool for data parallelism

9.2 How the Pros Do It

nginx Thread Pool:

  • Uses a work queue with task chains
  • Workers can process multiple consecutive tasks without re-locking
  • Optimized for I/O completion notifications

Java ThreadPoolExecutor:

  • Core vs maximum pool size (elasticity)
  • Configurable rejection policies (block, drop, caller-runs)
  • Keep-alive time for idle threads

libuv:

  • Single global thread pool (default 4 threads)
  • Focused on file I/O to avoid blocking event loop
  • Simple work request/completion callback model

9.3 Reading the Source

  1. libuv threadpool: https://github.com/libuv/libuv/blob/master/src/threadpool.c
    • Clean C implementation, ~300 lines
    • Good example of production-quality code
  2. nginx thread pool: https://github.com/nginx/nginx/blob/master/src/core/ngx_thread_pool.c
    • More complex, with task queuing and notification
    • Shows real-world optimizations
  3. OpenBSD’s thread pool: Look at their pool(9) implementation
    • BSD-style, well-documented

10. Resources

10.1 Man Pages

Essential reading:

$ man pthread_create    # Thread creation
$ man pthread_join      # Wait for thread termination
$ man pthread_mutex_lock # Mutex operations
$ man pthread_cond_wait  # Condition variables
$ man pthread_attr_init  # Thread attributes
$ man pthread_key_create # Thread-local storage

10.2 Online Resources

  • POSIX Threads Programming: https://computing.llnl.gov/tutorials/pthreads/
  • Thread Pool Pattern: https://en.wikipedia.org/wiki/Thread_pool
  • C11 Threads vs POSIX Threads: https://nullprogram.com/blog/2015/05/15/

10.3 Book Chapters

Book Relevant Chapters
“APUE” by Stevens Ch. 11 (Threads), Ch. 12 (Thread Control)
“TLPI” by Kerrisk Ch. 29 (Threads), Ch. 30 (Thread Sync)
“C++ Concurrency in Action” Ch. 9 (Thread Pools)
“Rust Atomics and Locks” Ch. 1-5 (Foundations)

11. Self-Assessment Checklist

Before considering this project complete, verify:

  • I can explain why thread pools are more efficient than thread-per-task
  • I understand why condition variable waits must be in a while loop
  • My implementation handles graceful shutdown correctly
  • My implementation handles immediate shutdown correctly
  • I can submit tasks from multiple threads simultaneously
  • valgrind reports zero memory leaks
  • helgrind (or tsan) reports zero data races
  • I can explain the difference between signal and broadcast
  • My code handles all error cases (malloc failure, pthread_create failure)
  • I can answer all five interview questions confidently
  • I understand how to size a thread pool for different workloads
  • I could extend this to support task priorities or futures

12. Submission / Completion Criteria

Your project is complete when:

  1. All tests pass: Basic execution, concurrent submission, shutdown tests
  2. Zero memory leaks: Verified by valgrind –leak-check=full
  3. Zero data races: Verified by helgrind or ThreadSanitizer
  4. Clean compilation: No warnings with -Wall -Wextra -Werror
  5. Performance verified: Benchmark shows expected speedup
  6. API documented: Header file has clear comments
  7. Edge cases handled: Full queue, empty pool, immediate destroy

Deliverables:

  • threadpool.h - Public API with documentation
  • threadpool.c - Complete implementation
  • threadpool_demo.c - Usage examples and tests
  • Makefile - Build with valgrind/helgrind targets
  • Console output showing successful tests and zero valgrind errors

Stretch Goals:

  • Performance benchmark comparing to single-threaded baseline
  • Support for task priorities
  • Support for waiting on specific task completion (futures)