Project 7: Producer-Consumer with POSIX Threads

Build a producer-consumer system with multiple producer and consumer threads sharing a bounded buffer, using mutexes and condition variables for synchronization.

Quick Reference

Attribute Value
Difficulty Level 3: Advanced
Time Estimate 1 Week (15-20 hours)
Language C (Alternatives: Rust, Go, C++)
Prerequisites Basic C, understanding of threads concept, memory model basics
Key Topics pthread_create/join, mutexes, condition variables, bounded buffer, race conditions

1. Learning Objectives

By completing this project, you will:

  1. Create and manage POSIX threads - Use pthread_create(), pthread_join(), pass arguments and get return values.
  2. Implement mutex synchronization - Protect shared data with pthread_mutex_lock/unlock.
  3. Master condition variables - Use pthread_cond_wait/signal/broadcast for thread coordination.
  4. Build the bounded buffer pattern - The canonical producer-consumer synchronization problem.
  5. Avoid common threading bugs - Understand spurious wakeups, lost wakeups, and deadlocks.
  6. Design thread-safe APIs - Think about API design for concurrent access.

2. Theoretical Foundation

2.1 Core Concepts

The Producer-Consumer Problem is the canonical synchronization challenge. Multiple producers create work items, multiple consumers process them, and they share a fixed-size buffer.

┌─────────────────────────────────────────────────────────────────────┐
│                         Bounded Buffer                               │
│                                                                       │
│   Producer 1 ──┐                                     ┌── Consumer 1   │
│                │     ┌───┬───┬───┬───┬───┐          │                │
│   Producer 2 ──┼────>│ A │ B │ C │   │   │──────────┼── Consumer 2   │
│                │     └───┴───┴───┴───┴───┘          │                │
│   Producer 3 ──┘         ^           ^              └── Consumer 3   │
│                          │           │                                │
│                        head        tail                               │
│                     (consume)     (produce)                           │
└─────────────────────────────────────────────────────────────────────┘

Constraints:
- Buffer has fixed capacity (e.g., 10 slots)
- Producers must WAIT if buffer is full
- Consumers must WAIT if buffer is empty
- Only ONE thread can modify buffer at a time

Why is this hard?

Without synchronization:

  1. Two producers might write to the same slot (data corruption)
  2. A consumer might read from an empty buffer (garbage data)
  3. The count of items might become wrong (lost updates)
  4. Threads might spin forever (no notification of state changes)

The Solution: Mutex + Condition Variables

┌─────────────────────────────────────────────────────────────────────┐
│                      Synchronization Primitives                      │
│                                                                       │
│   ┌─────────────┐     Protects all buffer access                     │
│   │   Mutex     │     Only one thread can hold at a time             │
│   └─────────────┘                                                     │
│                                                                       │
│   ┌─────────────┐     Producers wait here when buffer FULL           │
│   │  CV: not_full │   Signaled by consumers after taking item        │
│   └─────────────┘                                                     │
│                                                                       │
│   ┌─────────────┐     Consumers wait here when buffer EMPTY          │
│   │ CV: not_empty│    Signaled by producers after adding item        │
│   └─────────────┘                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.2 Why This Matters

This pattern appears EVERYWHERE in systems programming:

  • Web servers: Request queue between acceptor and worker threads
  • Databases: Transaction log buffer between clients and disk writer
  • Operating systems: I/O request queues between processes and drivers
  • Video processing: Frame buffers between decoder and encoder
  • Message queues: RabbitMQ, Kafka, Redis queues - all variations of this

Master this pattern, and you understand 80% of thread synchronization.

2.3 The Critical Pattern: While Loop for Condition Wait

THIS IS THE MOST IMPORTANT THING TO UNDERSTAND:

// WRONG - using if
pthread_mutex_lock(&mutex);
if (buffer_empty) {
    pthread_cond_wait(&not_empty, &mutex);
}
// Use item - BUG: might still be empty!
pthread_mutex_unlock(&mutex);

// CORRECT - using while
pthread_mutex_lock(&mutex);
while (buffer_empty) {         // <-- WHILE, not IF
    pthread_cond_wait(&not_empty, &mutex);
}
// Use item - guaranteed not empty
pthread_mutex_unlock(&mutex);

Why while instead of if?

  1. Spurious wakeups: POSIX allows cond_wait to return even without signal
  2. Stolen wakeup: Another consumer might grab the item first
  3. Broadcast: All waiters wake, but only one can proceed

The while loop re-checks the condition after every wakeup.


3. Project Specification

3.1 What You Will Build

A complete producer-consumer implementation with:

  • Configurable number of producers and consumers
  • Bounded circular buffer with configurable size
  • Command-line interface for parameters
  • Statistics collection (throughput, latency)
  • Graceful shutdown coordination

3.2 Functional Requirements

Requirement Description
R1 Multiple producer threads adding items to buffer
R2 Multiple consumer threads removing items from buffer
R3 Bounded buffer with configurable capacity
R4 Proper blocking when buffer full (producers) or empty (consumers)
R5 No lost items, no duplicate consumption
R6 Graceful shutdown - complete pending items, exit cleanly
R7 Thread-safe statistics collection

3.3 Non-Functional Requirements

  • Zero data races (verified with helgrind/tsan)
  • Zero deadlocks
  • All items produced are consumed exactly once
  • Performance: 50,000+ items/second throughput
  • Memory: No leaks (verified with valgrind)

3.4 Example Output

# Basic run
$ ./prodcons
Producer-Consumer Demo
Buffer size: 10, Producers: 2, Consumers: 2, Items: 1000

[P0] Produced item 0 (buffer: 1/10)
[P1] Produced item 1 (buffer: 2/10)
[C0] Consumed item 0 from P0 (buffer: 1/10)
[C1] Consumed item 1 from P1 (buffer: 0/10)
[P0] Produced item 2 (buffer: 1/10)
...

Summary:
  Items produced:  1000
  Items consumed:  1000
  Throughput:      45,230 items/sec
  Avg latency:     0.022 ms
  Max buffer fill: 10 (hit capacity)

# Stress test
$ ./prodcons -p 8 -c 8 -n 100000 -b 100
Producer-Consumer Stress Test
Producers: 8, Consumers: 8, Items: 100000, Buffer: 100

Running... (progress updates)
  25000/100000 (25%)
  50000/100000 (50%)
  75000/100000 (75%)

PASSED: All items accounted for
  Produced: 100000
  Consumed: 100000
  Elapsed:  1.23 seconds
  Throughput: 81,300 items/sec

# Verbose mode to see buffer state
$ ./prodcons -v -b 5 -n 10
Buffer visualization enabled

Buffer: [     ] 0/5
[P0] +item1  Buffer: [#    ] 1/5
[P1] +item2  Buffer: [##   ] 2/5
[P0] +item3  Buffer: [###  ] 3/5
[C0] -item1  Buffer: [##   ] 2/5
[C1] -item2  Buffer: [#    ] 1/5
[P1] +item4  Buffer: [##   ] 2/5
...

4. Solution Architecture

4.1 High-Level Design

┌─────────────────────────────────────────────────────────────────────┐
│                            Main Thread                               │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │ 1. Parse args  2. Init buffer  3. Create threads  4. Join all │  │
│  └───────────────────────────────────────────────────────────────┘  │
└───────────────────────────────────────────────────────────────────────┘
                                    │
                    ┌───────────────┼───────────────┐
                    │               │               │
                    v               v               v
            ┌───────────┐   ┌───────────┐   ┌───────────┐
            │ Producer  │   │ Producer  │   │ Consumer  │ ...
            │ Thread 0  │   │ Thread 1  │   │ Thread 0  │
            └─────┬─────┘   └─────┬─────┘   └─────┬─────┘
                  │               │               │
                  └───────────────┴───────────────┘
                                  │
                                  v
                    ┌─────────────────────────────┐
                    │       Shared Buffer          │
                    │  ┌─────────────────────┐    │
                    │  │ mutex               │    │
                    │  │ not_full (cond var) │    │
                    │  │ not_empty(cond var) │    │
                    │  │ items[capacity]     │    │
                    │  │ head, tail, count   │    │
                    │  └─────────────────────┘    │
                    └─────────────────────────────┘

4.2 Key Components

Component Responsibility Key Decisions
Buffer Hold items, track count Circular array with head/tail
Producer Generate items, add to buffer Block when full
Consumer Remove items, process Block when empty
Shutdown Coordinate clean exit Poison pill or done flag
Statistics Track throughput/latency Atomic counters or mutex

4.3 Data Structures

/* Work item - what producers create and consumers process */
typedef struct {
    int id;                  /* Unique item ID */
    int producer_id;         /* Who produced it */
    struct timespec created; /* When produced (for latency) */
} work_item_t;

/* Thread-safe bounded buffer */
typedef struct {
    work_item_t *items;      /* Circular buffer array */
    int capacity;            /* Maximum items */
    int count;               /* Current items in buffer */
    int head;                /* Next position to consume */
    int tail;                /* Next position to produce */

    pthread_mutex_t mutex;   /* Protects all fields */
    pthread_cond_t not_full; /* Signal when space available */
    pthread_cond_t not_empty;/* Signal when items available */

    int done;                /* Shutdown flag */
} buffer_t;

/* Per-thread context */
typedef struct {
    int thread_id;
    buffer_t *buffer;
    int items_to_produce;    /* For producers */
    int items_produced;      /* Stats */
    int items_consumed;      /* Stats */
} thread_ctx_t;

4.4 Algorithm: Producer

producer(ctx):
    for i in range(items_to_produce):
        item = create_item(i, ctx.thread_id)

        mutex_lock(buffer.mutex)

        // Wait while buffer is full
        while (buffer.count == buffer.capacity AND NOT buffer.done):
            cond_wait(buffer.not_full, buffer.mutex)

        if (buffer.done):
            mutex_unlock(buffer.mutex)
            break

        // Add item to buffer
        buffer.items[buffer.tail] = item
        buffer.tail = (buffer.tail + 1) % buffer.capacity
        buffer.count++

        // Signal consumers
        cond_signal(buffer.not_empty)

        mutex_unlock(buffer.mutex)

    return items_produced

4.5 Algorithm: Consumer

consumer(ctx):
    while (true):
        mutex_lock(buffer.mutex)

        // Wait while buffer is empty
        while (buffer.count == 0 AND NOT buffer.done):
            cond_wait(buffer.not_empty, buffer.mutex)

        // Check for shutdown with empty buffer
        if (buffer.count == 0 AND buffer.done):
            mutex_unlock(buffer.mutex)
            break

        // Remove item from buffer
        item = buffer.items[buffer.head]
        buffer.head = (buffer.head + 1) % buffer.capacity
        buffer.count--

        // Signal producers
        cond_signal(buffer.not_full)

        mutex_unlock(buffer.mutex)

        // Process item (outside lock!)
        process(item)
        ctx.items_consumed++

    return items_consumed

5. Implementation Guide

5.1 Development Environment Setup

# Install dependencies
sudo apt-get install build-essential valgrind

# Compile with thread support
gcc -pthread -Wall -Wextra -o prodcons prodcons.c

# For thread sanitizer (find races)
gcc -pthread -fsanitize=thread -o prodcons prodcons.c

5.2 Project Structure

prodcons/
├── src/
│   ├── main.c           # Entry point, argument parsing
│   ├── buffer.c         # Buffer implementation
│   ├── buffer.h         # Buffer API
│   ├── producer.c       # Producer thread function
│   ├── consumer.c       # Consumer thread function
│   └── stats.c          # Statistics collection
├── tests/
│   ├── test_buffer.c    # Buffer unit tests
│   └── test_sync.c      # Synchronization tests
├── Makefile
└── README.md

5.3 The Core Question You’re Answering

“How do you coordinate multiple threads accessing shared data without losing updates, corrupting state, or deadlocking?”

This is THE central question of concurrent programming. The answer involves:

  1. Mutual exclusion (mutex) - Only one thread modifies at a time
  2. Condition variables - Threads wait efficiently for state changes
  3. The while-loop pattern - Always re-check conditions after waking

5.4 Concepts You Must Understand First

Before coding, verify you understand:

  1. Why do you need a mutex?
    • Multiple threads reading/writing shared variables = race condition
    • Mutex ensures only one thread in critical section at a time
  2. Why condition variables instead of just mutex?
    • Without CV: busy-wait (spin checking condition = waste CPU)
    • With CV: thread sleeps, kernel wakes it when signaled
  3. Why does cond_wait take a mutex parameter?
    • cond_wait atomically: releases mutex AND goes to sleep
    • When woken: re-acquires mutex before returning
    • This prevents lost wakeups
  4. What’s a spurious wakeup?
    • POSIX allows cond_wait to return without being signaled
    • Therefore: ALWAYS use while loop, not if

5.5 Questions to Guide Your Design

  1. Circular buffer or linked list?
    • Circular: Fixed memory, predictable, cache-friendly
    • Linked: Unbounded (not truly bounded buffer), more allocations
  2. One condition variable or two?
    • One: Works but all threads wake on every signal
    • Two (not_full, not_empty): More precise, less thundering herd
  3. signal() or broadcast()?
    • signal(): Wakes one waiter (usually sufficient)
    • broadcast(): Wakes all waiters (needed for shutdown)
  4. How to shut down?
    • Poison pill: Special item meaning “exit”
    • Done flag: Set flag, broadcast to all waiters

5.6 Thinking Exercise

Trace a Race Condition:

Without mutex, what happens here?

Thread A (Producer)              Thread B (Consumer)
-------------------              -------------------
read count (count = 5)
                                 read count (count = 5)
                                 if count > 0:
if count < capacity:
  items[tail] = new_item
                                   item = items[head]
                                   head++
  tail++
                                   count--     // count = 4
  count++                        // count = 5

Final: count = 5, but we added one and removed one!
Should be: 5 + 1 - 1 = 5 (correct by accident)

But if timing differs:
Thread A writes count = 6
Thread B writes count = 4
Final could be 4 or 6 depending on who writes last!

Now trace WITH mutex:

Thread A (Producer)              Thread B (Consumer)
-------------------              -------------------
lock(mutex)
  count = 5
  items[5] = item                lock(mutex)  // BLOCKS
  tail = 6
  count = 6                      // waiting...
unlock(mutex)
                                 // now acquires lock
                                   count = 6
                                   item = items[0]
                                   head = 1
                                   count = 5
                                 unlock(mutex)

Final: count = 5, correct!

5.7 Hints in Layers

Hint 1: Buffer Initialization

buffer_t *buffer_create(int capacity) {
    buffer_t *buf = malloc(sizeof(buffer_t));
    buf->items = malloc(capacity * sizeof(work_item_t));
    buf->capacity = capacity;
    buf->count = buf->head = buf->tail = 0;
    buf->done = 0;

    pthread_mutex_init(&buf->mutex, NULL);
    pthread_cond_init(&buf->not_full, NULL);
    pthread_cond_init(&buf->not_empty, NULL);

    return buf;
}

Hint 2: Producer Pattern

void buffer_put(buffer_t *buf, work_item_t item) {
    pthread_mutex_lock(&buf->mutex);

    while (buf->count == buf->capacity && !buf->done) {
        pthread_cond_wait(&buf->not_full, &buf->mutex);
    }

    if (buf->done) {
        pthread_mutex_unlock(&buf->mutex);
        return;
    }

    buf->items[buf->tail] = item;
    buf->tail = (buf->tail + 1) % buf->capacity;
    buf->count++;

    pthread_cond_signal(&buf->not_empty);
    pthread_mutex_unlock(&buf->mutex);
}

Hint 3: Consumer Pattern

int buffer_get(buffer_t *buf, work_item_t *item) {
    pthread_mutex_lock(&buf->mutex);

    while (buf->count == 0 && !buf->done) {
        pthread_cond_wait(&buf->not_empty, &buf->mutex);
    }

    if (buf->count == 0 && buf->done) {
        pthread_mutex_unlock(&buf->mutex);
        return 0;  // No more items
    }

    *item = buf->items[buf->head];
    buf->head = (buf->head + 1) % buf->capacity;
    buf->count--;

    pthread_cond_signal(&buf->not_full);
    pthread_mutex_unlock(&buf->mutex);
    return 1;  // Got an item
}

Hint 4: Graceful Shutdown

void buffer_shutdown(buffer_t *buf) {
    pthread_mutex_lock(&buf->mutex);
    buf->done = 1;
    pthread_cond_broadcast(&buf->not_full);  // Wake all producers
    pthread_cond_broadcast(&buf->not_empty); // Wake all consumers
    pthread_mutex_unlock(&buf->mutex);
}

5.8 The Interview Questions They’ll Ask

  1. “Why must you check the condition in a while loop, not an if?”
    • Spurious wakeups: cond_wait can return without signal
    • Stolen wakeup: Another thread got the item first
    • Broadcast: All wake up, only one can proceed
  2. “What’s the difference between pthread_cond_signal and pthread_cond_broadcast?”
    • signal: Wakes at least one waiting thread
    • broadcast: Wakes all waiting threads
    • Use signal for normal operation, broadcast for shutdown
  3. “Can you have a deadlock with just one mutex?”
    • Yes, if you try to lock it twice from the same thread (unless recursive)
    • Or if cond_wait is called without holding the mutex
  4. “How would you implement a thread-safe queue?”
    • This is exactly what we’re building!
    • Mutex + two condition variables + circular buffer
  5. “What is priority inversion and how do you prevent it?”
    • Low-priority thread holds lock, high-priority thread waits
    • Solutions: priority inheritance, priority ceiling

5.9 Books That Will Help

Topic Book Chapter
POSIX threads basics “APUE” by Stevens & Rago Ch. 11
Synchronization “APUE” by Stevens & Rago Ch. 11.6
Advanced patterns “The Linux Programming Interface” by Kerrisk Ch. 29-33
Lock-free alternatives “Rust Atomics and Locks” by Mara Bos Ch. 1-5

6. Testing Strategy

6.1 Test Categories

Category Purpose Examples
Unit Buffer operations put/get with single thread
Functional Full producer-consumer Multiple threads, verify count
Stress Race conditions Many threads, many items
Deadlock Locking issues helgrind/tsan verification

6.2 Critical Test Cases

# Test 1: Basic correctness
./prodcons -p 1 -c 1 -n 100 -b 10
# Expected: 100 produced, 100 consumed

# Test 2: Multiple threads
./prodcons -p 4 -c 4 -n 10000 -b 50
# Expected: All items accounted for, no races

# Test 3: Buffer pressure (more producers)
./prodcons -p 8 -c 2 -n 10000 -b 5
# Expected: Producers block often, but complete

# Test 4: Starvation test (more consumers)
./prodcons -p 2 -c 8 -n 10000 -b 5
# Expected: Consumers block often, but complete

# Test 5: Thread sanitizer
gcc -pthread -fsanitize=thread -o prodcons prodcons.c
./prodcons -p 4 -c 4 -n 1000
# Expected: No data race warnings

# Test 6: Helgrind (Valgrind thread checker)
valgrind --tool=helgrind ./prodcons -p 2 -c 2 -n 100
# Expected: No errors

# Test 7: Memory leaks
valgrind --leak-check=full ./prodcons -p 2 -c 2 -n 100
# Expected: No leaks

7. Common Pitfalls & Debugging

Pitfall Symptom Solution
Using if instead of while Occasional corruption Always use while for cond_wait
Forgetting to unlock on error path Deadlock Use goto cleanup or RAII
Not holding mutex for cond_wait Undefined behavior Always lock before wait
Signaling without holding lock Lost wakeups Signal while holding lock
No shutdown mechanism Threads hang forever Done flag + broadcast
Reading shared data without lock Race conditions Lock for ALL shared access

Debugging Techniques:

# See thread activity
strace -f ./prodcons 2>&1 | grep -E 'futex|clone'

# Debug with GDB
gdb ./prodcons
(gdb) set print thread-events on
(gdb) break buffer_put
(gdb) run -p 2 -c 2 -n 10
(gdb) info threads
(gdb) thread 2
(gdb) bt

# Helgrind for race detection
valgrind --tool=helgrind --history-level=full ./prodcons

# Thread sanitizer (compile time)
gcc -fsanitize=thread -g -o prodcons prodcons.c -pthread
./prodcons
# Reports data races with stack traces

8. Extensions & Challenges

8.1 Beginner Extensions

  • Add item processing time (simulate work with usleep)
  • Add per-item latency tracking (time from produce to consume)
  • Implement verbose mode showing buffer state

8.2 Intermediate Extensions

  • Add priority levels to items (priority queue)
  • Implement multiple queues with work stealing
  • Add timeout to blocking operations (pthread_cond_timedwait)

8.3 Advanced Extensions

  • Implement lock-free ring buffer using atomics
  • Add backpressure mechanism (slow down producers when buffer fills)
  • Benchmark against std::queue with mutex (C++) or crossbeam (Rust)

9. Resources

9.1 Essential Reading

  • “APUE” Ch. 11 - Threads and synchronization
  • “TLPI” Ch. 30 - Condition variables in depth
  • pthread_cond_wait man page - Critical semantics

9.2 Code References

  • Redis: src/bio.c (background I/O threads)
  • PostgreSQL: src/backend/storage/ipc/
  • nginx: src/os/unix/ngx_thread_pool.c

9.3 Man Pages

man pthread_create
man pthread_mutex_lock
man pthread_cond_wait
man pthread_cond_signal

10. Self-Assessment Checklist

Before considering this project complete, verify:

  • I understand why condition waits must be in a while loop
  • I can explain the difference between signal() and broadcast()
  • My implementation has zero races (verified with tsan/helgrind)
  • All produced items are consumed exactly once
  • Graceful shutdown works (no hung threads)
  • I can explain the mutex parameter to pthread_cond_wait
  • valgrind reports zero memory leaks
  • I understand when to use broadcast vs signal
  • I can describe a deadlock scenario and how to prevent it
  • Performance meets target (50K+ items/sec)

11. Submission / Completion Criteria

Minimum Viable Completion:

  • Working producer-consumer with 2+ threads each
  • Mutex and condition variables properly used
  • All items accounted for (no loss/duplication)
  • Clean shutdown

Full Completion:

  • Configurable thread counts and buffer size
  • Statistics collection (throughput, latency)
  • helgrind/tsan clean
  • valgrind clean

Excellence (Going Above & Beyond):

  • Lock-free implementation comparison
  • Work stealing between multiple queues
  • Comprehensive benchmark suite

This guide was generated from project_based_ideas/SYSTEMS_PROGRAMMING/ADVANCED_UNIX_PROGRAMMING_DEEP_DIVE.md. For the complete sprint overview, see the README.md in this directory.