Project 7: Producer-Consumer with POSIX Threads
Build a producer-consumer system with multiple producer and consumer threads sharing a bounded buffer, using mutexes and condition variables for synchronization.
Quick Reference
| Attribute | Value |
|---|---|
| Difficulty | Level 3: Advanced |
| Time Estimate | 1 Week (15-20 hours) |
| Language | C (Alternatives: Rust, Go, C++) |
| Prerequisites | Basic C, understanding of threads concept, memory model basics |
| Key Topics | pthread_create/join, mutexes, condition variables, bounded buffer, race conditions |
1. Learning Objectives
By completing this project, you will:
- Create and manage POSIX threads - Use pthread_create(), pthread_join(), pass arguments and get return values.
- Implement mutex synchronization - Protect shared data with pthread_mutex_lock/unlock.
- Master condition variables - Use pthread_cond_wait/signal/broadcast for thread coordination.
- Build the bounded buffer pattern - The canonical producer-consumer synchronization problem.
- Avoid common threading bugs - Understand spurious wakeups, lost wakeups, and deadlocks.
- Design thread-safe APIs - Think about API design for concurrent access.
2. Theoretical Foundation
2.1 Core Concepts
The Producer-Consumer Problem is the canonical synchronization challenge. Multiple producers create work items, multiple consumers process them, and they share a fixed-size buffer.
┌─────────────────────────────────────────────────────────────────────┐
│ Bounded Buffer │
│ │
│ Producer 1 ──┐ ┌── Consumer 1 │
│ │ ┌───┬───┬───┬───┬───┐ │ │
│ Producer 2 ──┼────>│ A │ B │ C │ │ │──────────┼── Consumer 2 │
│ │ └───┴───┴───┴───┴───┘ │ │
│ Producer 3 ──┘ ^ ^ └── Consumer 3 │
│ │ │ │
│ head tail │
│ (consume) (produce) │
└─────────────────────────────────────────────────────────────────────┘
Constraints:
- Buffer has fixed capacity (e.g., 10 slots)
- Producers must WAIT if buffer is full
- Consumers must WAIT if buffer is empty
- Only ONE thread can modify buffer at a time
Why is this hard?
Without synchronization:
- Two producers might write to the same slot (data corruption)
- A consumer might read from an empty buffer (garbage data)
- The count of items might become wrong (lost updates)
- Threads might spin forever (no notification of state changes)
The Solution: Mutex + Condition Variables
┌─────────────────────────────────────────────────────────────────────┐
│ Synchronization Primitives │
│ │
│ ┌─────────────┐ Protects all buffer access │
│ │ Mutex │ Only one thread can hold at a time │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ Producers wait here when buffer FULL │
│ │ CV: not_full │ Signaled by consumers after taking item │
│ └─────────────┘ │
│ │
│ ┌─────────────┐ Consumers wait here when buffer EMPTY │
│ │ CV: not_empty│ Signaled by producers after adding item │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
2.2 Why This Matters
This pattern appears EVERYWHERE in systems programming:
- Web servers: Request queue between acceptor and worker threads
- Databases: Transaction log buffer between clients and disk writer
- Operating systems: I/O request queues between processes and drivers
- Video processing: Frame buffers between decoder and encoder
- Message queues: RabbitMQ, Kafka, Redis queues - all variations of this
Master this pattern, and you understand 80% of thread synchronization.
2.3 The Critical Pattern: While Loop for Condition Wait
THIS IS THE MOST IMPORTANT THING TO UNDERSTAND:
// WRONG - using if
pthread_mutex_lock(&mutex);
if (buffer_empty) {
pthread_cond_wait(¬_empty, &mutex);
}
// Use item - BUG: might still be empty!
pthread_mutex_unlock(&mutex);
// CORRECT - using while
pthread_mutex_lock(&mutex);
while (buffer_empty) { // <-- WHILE, not IF
pthread_cond_wait(¬_empty, &mutex);
}
// Use item - guaranteed not empty
pthread_mutex_unlock(&mutex);
Why while instead of if?
- Spurious wakeups: POSIX allows cond_wait to return even without signal
- Stolen wakeup: Another consumer might grab the item first
- Broadcast: All waiters wake, but only one can proceed
The while loop re-checks the condition after every wakeup.
3. Project Specification
3.1 What You Will Build
A complete producer-consumer implementation with:
- Configurable number of producers and consumers
- Bounded circular buffer with configurable size
- Command-line interface for parameters
- Statistics collection (throughput, latency)
- Graceful shutdown coordination
3.2 Functional Requirements
| Requirement | Description |
|---|---|
| R1 | Multiple producer threads adding items to buffer |
| R2 | Multiple consumer threads removing items from buffer |
| R3 | Bounded buffer with configurable capacity |
| R4 | Proper blocking when buffer full (producers) or empty (consumers) |
| R5 | No lost items, no duplicate consumption |
| R6 | Graceful shutdown - complete pending items, exit cleanly |
| R7 | Thread-safe statistics collection |
3.3 Non-Functional Requirements
- Zero data races (verified with helgrind/tsan)
- Zero deadlocks
- All items produced are consumed exactly once
- Performance: 50,000+ items/second throughput
- Memory: No leaks (verified with valgrind)
3.4 Example Output
# Basic run
$ ./prodcons
Producer-Consumer Demo
Buffer size: 10, Producers: 2, Consumers: 2, Items: 1000
[P0] Produced item 0 (buffer: 1/10)
[P1] Produced item 1 (buffer: 2/10)
[C0] Consumed item 0 from P0 (buffer: 1/10)
[C1] Consumed item 1 from P1 (buffer: 0/10)
[P0] Produced item 2 (buffer: 1/10)
...
Summary:
Items produced: 1000
Items consumed: 1000
Throughput: 45,230 items/sec
Avg latency: 0.022 ms
Max buffer fill: 10 (hit capacity)
# Stress test
$ ./prodcons -p 8 -c 8 -n 100000 -b 100
Producer-Consumer Stress Test
Producers: 8, Consumers: 8, Items: 100000, Buffer: 100
Running... (progress updates)
25000/100000 (25%)
50000/100000 (50%)
75000/100000 (75%)
PASSED: All items accounted for
Produced: 100000
Consumed: 100000
Elapsed: 1.23 seconds
Throughput: 81,300 items/sec
# Verbose mode to see buffer state
$ ./prodcons -v -b 5 -n 10
Buffer visualization enabled
Buffer: [ ] 0/5
[P0] +item1 Buffer: [# ] 1/5
[P1] +item2 Buffer: [## ] 2/5
[P0] +item3 Buffer: [### ] 3/5
[C0] -item1 Buffer: [## ] 2/5
[C1] -item2 Buffer: [# ] 1/5
[P1] +item4 Buffer: [## ] 2/5
...
4. Solution Architecture
4.1 High-Level Design
┌─────────────────────────────────────────────────────────────────────┐
│ Main Thread │
│ ┌───────────────────────────────────────────────────────────────┐ │
│ │ 1. Parse args 2. Init buffer 3. Create threads 4. Join all │ │
│ └───────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
v v v
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Producer │ │ Producer │ │ Consumer │ ...
│ Thread 0 │ │ Thread 1 │ │ Thread 0 │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
└───────────────┴───────────────┘
│
v
┌─────────────────────────────┐
│ Shared Buffer │
│ ┌─────────────────────┐ │
│ │ mutex │ │
│ │ not_full (cond var) │ │
│ │ not_empty(cond var) │ │
│ │ items[capacity] │ │
│ │ head, tail, count │ │
│ └─────────────────────┘ │
└─────────────────────────────┘
4.2 Key Components
| Component | Responsibility | Key Decisions |
|---|---|---|
| Buffer | Hold items, track count | Circular array with head/tail |
| Producer | Generate items, add to buffer | Block when full |
| Consumer | Remove items, process | Block when empty |
| Shutdown | Coordinate clean exit | Poison pill or done flag |
| Statistics | Track throughput/latency | Atomic counters or mutex |
4.3 Data Structures
/* Work item - what producers create and consumers process */
typedef struct {
int id; /* Unique item ID */
int producer_id; /* Who produced it */
struct timespec created; /* When produced (for latency) */
} work_item_t;
/* Thread-safe bounded buffer */
typedef struct {
work_item_t *items; /* Circular buffer array */
int capacity; /* Maximum items */
int count; /* Current items in buffer */
int head; /* Next position to consume */
int tail; /* Next position to produce */
pthread_mutex_t mutex; /* Protects all fields */
pthread_cond_t not_full; /* Signal when space available */
pthread_cond_t not_empty;/* Signal when items available */
int done; /* Shutdown flag */
} buffer_t;
/* Per-thread context */
typedef struct {
int thread_id;
buffer_t *buffer;
int items_to_produce; /* For producers */
int items_produced; /* Stats */
int items_consumed; /* Stats */
} thread_ctx_t;
4.4 Algorithm: Producer
producer(ctx):
for i in range(items_to_produce):
item = create_item(i, ctx.thread_id)
mutex_lock(buffer.mutex)
// Wait while buffer is full
while (buffer.count == buffer.capacity AND NOT buffer.done):
cond_wait(buffer.not_full, buffer.mutex)
if (buffer.done):
mutex_unlock(buffer.mutex)
break
// Add item to buffer
buffer.items[buffer.tail] = item
buffer.tail = (buffer.tail + 1) % buffer.capacity
buffer.count++
// Signal consumers
cond_signal(buffer.not_empty)
mutex_unlock(buffer.mutex)
return items_produced
4.5 Algorithm: Consumer
consumer(ctx):
while (true):
mutex_lock(buffer.mutex)
// Wait while buffer is empty
while (buffer.count == 0 AND NOT buffer.done):
cond_wait(buffer.not_empty, buffer.mutex)
// Check for shutdown with empty buffer
if (buffer.count == 0 AND buffer.done):
mutex_unlock(buffer.mutex)
break
// Remove item from buffer
item = buffer.items[buffer.head]
buffer.head = (buffer.head + 1) % buffer.capacity
buffer.count--
// Signal producers
cond_signal(buffer.not_full)
mutex_unlock(buffer.mutex)
// Process item (outside lock!)
process(item)
ctx.items_consumed++
return items_consumed
5. Implementation Guide
5.1 Development Environment Setup
# Install dependencies
sudo apt-get install build-essential valgrind
# Compile with thread support
gcc -pthread -Wall -Wextra -o prodcons prodcons.c
# For thread sanitizer (find races)
gcc -pthread -fsanitize=thread -o prodcons prodcons.c
5.2 Project Structure
prodcons/
├── src/
│ ├── main.c # Entry point, argument parsing
│ ├── buffer.c # Buffer implementation
│ ├── buffer.h # Buffer API
│ ├── producer.c # Producer thread function
│ ├── consumer.c # Consumer thread function
│ └── stats.c # Statistics collection
├── tests/
│ ├── test_buffer.c # Buffer unit tests
│ └── test_sync.c # Synchronization tests
├── Makefile
└── README.md
5.3 The Core Question You’re Answering
“How do you coordinate multiple threads accessing shared data without losing updates, corrupting state, or deadlocking?”
This is THE central question of concurrent programming. The answer involves:
- Mutual exclusion (mutex) - Only one thread modifies at a time
- Condition variables - Threads wait efficiently for state changes
- The while-loop pattern - Always re-check conditions after waking
5.4 Concepts You Must Understand First
Before coding, verify you understand:
- Why do you need a mutex?
- Multiple threads reading/writing shared variables = race condition
- Mutex ensures only one thread in critical section at a time
- Why condition variables instead of just mutex?
- Without CV: busy-wait (spin checking condition = waste CPU)
- With CV: thread sleeps, kernel wakes it when signaled
- Why does cond_wait take a mutex parameter?
- cond_wait atomically: releases mutex AND goes to sleep
- When woken: re-acquires mutex before returning
- This prevents lost wakeups
- What’s a spurious wakeup?
- POSIX allows cond_wait to return without being signaled
- Therefore: ALWAYS use while loop, not if
5.5 Questions to Guide Your Design
- Circular buffer or linked list?
- Circular: Fixed memory, predictable, cache-friendly
- Linked: Unbounded (not truly bounded buffer), more allocations
- One condition variable or two?
- One: Works but all threads wake on every signal
- Two (not_full, not_empty): More precise, less thundering herd
- signal() or broadcast()?
- signal(): Wakes one waiter (usually sufficient)
- broadcast(): Wakes all waiters (needed for shutdown)
- How to shut down?
- Poison pill: Special item meaning “exit”
- Done flag: Set flag, broadcast to all waiters
5.6 Thinking Exercise
Trace a Race Condition:
Without mutex, what happens here?
Thread A (Producer) Thread B (Consumer)
------------------- -------------------
read count (count = 5)
read count (count = 5)
if count > 0:
if count < capacity:
items[tail] = new_item
item = items[head]
head++
tail++
count-- // count = 4
count++ // count = 5
Final: count = 5, but we added one and removed one!
Should be: 5 + 1 - 1 = 5 (correct by accident)
But if timing differs:
Thread A writes count = 6
Thread B writes count = 4
Final could be 4 or 6 depending on who writes last!
Now trace WITH mutex:
Thread A (Producer) Thread B (Consumer)
------------------- -------------------
lock(mutex)
count = 5
items[5] = item lock(mutex) // BLOCKS
tail = 6
count = 6 // waiting...
unlock(mutex)
// now acquires lock
count = 6
item = items[0]
head = 1
count = 5
unlock(mutex)
Final: count = 5, correct!
5.7 Hints in Layers
Hint 1: Buffer Initialization
buffer_t *buffer_create(int capacity) {
buffer_t *buf = malloc(sizeof(buffer_t));
buf->items = malloc(capacity * sizeof(work_item_t));
buf->capacity = capacity;
buf->count = buf->head = buf->tail = 0;
buf->done = 0;
pthread_mutex_init(&buf->mutex, NULL);
pthread_cond_init(&buf->not_full, NULL);
pthread_cond_init(&buf->not_empty, NULL);
return buf;
}
Hint 2: Producer Pattern
void buffer_put(buffer_t *buf, work_item_t item) {
pthread_mutex_lock(&buf->mutex);
while (buf->count == buf->capacity && !buf->done) {
pthread_cond_wait(&buf->not_full, &buf->mutex);
}
if (buf->done) {
pthread_mutex_unlock(&buf->mutex);
return;
}
buf->items[buf->tail] = item;
buf->tail = (buf->tail + 1) % buf->capacity;
buf->count++;
pthread_cond_signal(&buf->not_empty);
pthread_mutex_unlock(&buf->mutex);
}
Hint 3: Consumer Pattern
int buffer_get(buffer_t *buf, work_item_t *item) {
pthread_mutex_lock(&buf->mutex);
while (buf->count == 0 && !buf->done) {
pthread_cond_wait(&buf->not_empty, &buf->mutex);
}
if (buf->count == 0 && buf->done) {
pthread_mutex_unlock(&buf->mutex);
return 0; // No more items
}
*item = buf->items[buf->head];
buf->head = (buf->head + 1) % buf->capacity;
buf->count--;
pthread_cond_signal(&buf->not_full);
pthread_mutex_unlock(&buf->mutex);
return 1; // Got an item
}
Hint 4: Graceful Shutdown
void buffer_shutdown(buffer_t *buf) {
pthread_mutex_lock(&buf->mutex);
buf->done = 1;
pthread_cond_broadcast(&buf->not_full); // Wake all producers
pthread_cond_broadcast(&buf->not_empty); // Wake all consumers
pthread_mutex_unlock(&buf->mutex);
}
5.8 The Interview Questions They’ll Ask
- “Why must you check the condition in a while loop, not an if?”
- Spurious wakeups: cond_wait can return without signal
- Stolen wakeup: Another thread got the item first
- Broadcast: All wake up, only one can proceed
- “What’s the difference between pthread_cond_signal and pthread_cond_broadcast?”
- signal: Wakes at least one waiting thread
- broadcast: Wakes all waiting threads
- Use signal for normal operation, broadcast for shutdown
- “Can you have a deadlock with just one mutex?”
- Yes, if you try to lock it twice from the same thread (unless recursive)
- Or if cond_wait is called without holding the mutex
- “How would you implement a thread-safe queue?”
- This is exactly what we’re building!
- Mutex + two condition variables + circular buffer
- “What is priority inversion and how do you prevent it?”
- Low-priority thread holds lock, high-priority thread waits
- Solutions: priority inheritance, priority ceiling
5.9 Books That Will Help
| Topic | Book | Chapter |
|---|---|---|
| POSIX threads basics | “APUE” by Stevens & Rago | Ch. 11 |
| Synchronization | “APUE” by Stevens & Rago | Ch. 11.6 |
| Advanced patterns | “The Linux Programming Interface” by Kerrisk | Ch. 29-33 |
| Lock-free alternatives | “Rust Atomics and Locks” by Mara Bos | Ch. 1-5 |
6. Testing Strategy
6.1 Test Categories
| Category | Purpose | Examples |
|---|---|---|
| Unit | Buffer operations | put/get with single thread |
| Functional | Full producer-consumer | Multiple threads, verify count |
| Stress | Race conditions | Many threads, many items |
| Deadlock | Locking issues | helgrind/tsan verification |
6.2 Critical Test Cases
# Test 1: Basic correctness
./prodcons -p 1 -c 1 -n 100 -b 10
# Expected: 100 produced, 100 consumed
# Test 2: Multiple threads
./prodcons -p 4 -c 4 -n 10000 -b 50
# Expected: All items accounted for, no races
# Test 3: Buffer pressure (more producers)
./prodcons -p 8 -c 2 -n 10000 -b 5
# Expected: Producers block often, but complete
# Test 4: Starvation test (more consumers)
./prodcons -p 2 -c 8 -n 10000 -b 5
# Expected: Consumers block often, but complete
# Test 5: Thread sanitizer
gcc -pthread -fsanitize=thread -o prodcons prodcons.c
./prodcons -p 4 -c 4 -n 1000
# Expected: No data race warnings
# Test 6: Helgrind (Valgrind thread checker)
valgrind --tool=helgrind ./prodcons -p 2 -c 2 -n 100
# Expected: No errors
# Test 7: Memory leaks
valgrind --leak-check=full ./prodcons -p 2 -c 2 -n 100
# Expected: No leaks
7. Common Pitfalls & Debugging
| Pitfall | Symptom | Solution |
|---|---|---|
Using if instead of while |
Occasional corruption | Always use while for cond_wait |
| Forgetting to unlock on error path | Deadlock | Use goto cleanup or RAII |
| Not holding mutex for cond_wait | Undefined behavior | Always lock before wait |
| Signaling without holding lock | Lost wakeups | Signal while holding lock |
| No shutdown mechanism | Threads hang forever | Done flag + broadcast |
| Reading shared data without lock | Race conditions | Lock for ALL shared access |
Debugging Techniques:
# See thread activity
strace -f ./prodcons 2>&1 | grep -E 'futex|clone'
# Debug with GDB
gdb ./prodcons
(gdb) set print thread-events on
(gdb) break buffer_put
(gdb) run -p 2 -c 2 -n 10
(gdb) info threads
(gdb) thread 2
(gdb) bt
# Helgrind for race detection
valgrind --tool=helgrind --history-level=full ./prodcons
# Thread sanitizer (compile time)
gcc -fsanitize=thread -g -o prodcons prodcons.c -pthread
./prodcons
# Reports data races with stack traces
8. Extensions & Challenges
8.1 Beginner Extensions
- Add item processing time (simulate work with usleep)
- Add per-item latency tracking (time from produce to consume)
- Implement verbose mode showing buffer state
8.2 Intermediate Extensions
- Add priority levels to items (priority queue)
- Implement multiple queues with work stealing
- Add timeout to blocking operations (pthread_cond_timedwait)
8.3 Advanced Extensions
- Implement lock-free ring buffer using atomics
- Add backpressure mechanism (slow down producers when buffer fills)
- Benchmark against std::queue with mutex (C++) or crossbeam (Rust)
9. Resources
9.1 Essential Reading
- “APUE” Ch. 11 - Threads and synchronization
- “TLPI” Ch. 30 - Condition variables in depth
- pthread_cond_wait man page - Critical semantics
9.2 Code References
- Redis:
src/bio.c(background I/O threads) - PostgreSQL:
src/backend/storage/ipc/ - nginx:
src/os/unix/ngx_thread_pool.c
9.3 Man Pages
man pthread_create
man pthread_mutex_lock
man pthread_cond_wait
man pthread_cond_signal
10. Self-Assessment Checklist
Before considering this project complete, verify:
- I understand why condition waits must be in a while loop
- I can explain the difference between signal() and broadcast()
- My implementation has zero races (verified with tsan/helgrind)
- All produced items are consumed exactly once
- Graceful shutdown works (no hung threads)
- I can explain the mutex parameter to pthread_cond_wait
- valgrind reports zero memory leaks
- I understand when to use broadcast vs signal
- I can describe a deadlock scenario and how to prevent it
- Performance meets target (50K+ items/sec)
11. Submission / Completion Criteria
Minimum Viable Completion:
- Working producer-consumer with 2+ threads each
- Mutex and condition variables properly used
- All items accounted for (no loss/duplication)
- Clean shutdown
Full Completion:
- Configurable thread counts and buffer size
- Statistics collection (throughput, latency)
- helgrind/tsan clean
- valgrind clean
Excellence (Going Above & Beyond):
- Lock-free implementation comparison
- Work stealing between multiple queues
- Comprehensive benchmark suite
This guide was generated from project_based_ideas/SYSTEMS_PROGRAMMING/ADVANCED_UNIX_PROGRAMMING_DEEP_DIVE.md. For the complete sprint overview, see the README.md in this directory.