Project 2: Promise-Based Task Coordinator
Build a task execution system where jobs can depend on other jobs, using
std::futureandstd::promiseto coordinate completion and pass results between tasks.
Quick Reference
| Attribute | Value |
|---|---|
| Language | C++ (alt: Rust, JavaScript, Go) |
| Difficulty | Intermediate |
| Time | 1-2 weeks |
| Prerequisites | Project 1, move semantics |
| Coolness | Genuinely Clever |
| Portfolio Value | Resume Gold |
Learning Objectives
By completing this project, you will:
- Master the future/promise paradigm: Understand that a
std::promiseis one end of a one-shot channel, andstd::futureis the other end - Implement dependency-based task scheduling: Build task graphs where tasks execute only when their dependencies complete
- Propagate exceptions across threads: Use
promise::set_exception()to transmit errors through the async boundary - Handle diamond dependencies with shared_future: Understand why
std::shared_futureexists and when multiple consumers need the same result - Avoid future-related deadlocks: Recognize that calling
future::get()from the wrong thread can block forever - Design task cancellation strategies: Implement cascading cancellation when upstream tasks fail
- Compare with callback-based approaches: Understand why futures compose better than nested callbacks
Theoretical Foundation
Core Concepts
Futures and Promises as One-Shot Channels
A std::promise<T> and std::future<T> together form a one-time communication channel between threads. The promise is the “write end” and the future is the “read end”:
Producer Thread Consumer Thread
| |
std::promise<T> p; std::future<T> f = p.get_future();
| |
[compute result] |
| |
p.set_value(result); ---------> T val = f.get(); // blocks until ready
| |
[done] [has val]
Key insight: Unlike a queue or channel, a future can only be “read” once. After you call get(), the future is empty. This is fundamentally different from observable patterns or reactive streams.
The Promise Contract
A promise makes exactly one of these guarantees:
- A value will be set via
set_value() - An exception will be set via
set_exception() - The promise will be destroyed without setting anything (broken promise exception)
std::promise<int> p;
std::future<int> f = p.get_future();
// Option 1: Success
p.set_value(42);
// Option 2: Failure
p.set_exception(std::make_exception_ptr(std::runtime_error("failed")));
// Option 3: Broken promise (if p is destroyed without calling either)
// f.get() will throw std::future_error with broken_promise
std::async vs Manual Promises
std::async is a convenience that creates a promise, spawns a thread (maybe), and returns the future:
// Using std::async
std::future<int> f = std::async(std::launch::async, []() {
return expensive_computation();
});
// Equivalent manual approach
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t([p = std::move(p)]() mutable {
try {
p.set_value(expensive_computation());
} catch (...) {
p.set_exception(std::current_exception());
}
});
t.detach();
When to use which:
std::async: Simple one-off async computations- Manual promises: Complex dependency graphs, custom thread pools, integration with existing infrastructure
std::shared_future for Multiple Consumers
A regular std::future can only be moved, not copied. What if two tasks depend on the same upstream result?
Task A
/ \
(future) (future) <-- Problem: future is move-only!
| |
Task B Task C
Solution: std::shared_future can be copied and read multiple times:
std::promise<int> p;
std::shared_future<int> sf = p.get_future().share();
// Now both threads can safely call sf.get()
auto b = std::async([sf]() { return sf.get() * 2; });
auto c = std::async([sf]() { return sf.get() + 10; });
Critical difference: std::shared_future::get() returns a const T&, not T. The value is shared, not moved.
Why This Matters
Problem 1: Callback Hell
Without futures, expressing “do A, then B, then C” requires nested callbacks:
// Callback approach (hard to read, error-prone)
download_file(url, [](Result r1) {
if (r1.success()) {
parse_json(r1.data, [](Result r2) {
if (r2.success()) {
validate(r2.json, [](Result r3) {
// ... and so on
});
}
});
}
});
// Future approach (linear, composable)
auto f1 = download_file_async(url);
auto f2 = f1.then([](auto data) { return parse_json(data); });
auto f3 = f2.then([](auto json) { return validate(json); });
Problem 2: Shared State Complexity
Without futures, coordinating completion requires manual synchronization:
// Manual approach (error-prone)
std::mutex m;
std::condition_variable cv;
bool ready = false;
int result;
// Producer
{
std::lock_guard lk(m);
result = compute();
ready = true;
}
cv.notify_one();
// Consumer
{
std::unique_lock lk(m);
cv.wait(lk, [&]{ return ready; });
use(result);
}
// Future approach (encapsulated)
auto f = std::async(std::launch::async, compute);
use(f.get());
Problem 3: Error Propagation
Exceptions thrown in one thread must somehow reach the waiting thread. Futures handle this automatically:
auto f = std::async(std::launch::async, []() {
throw std::runtime_error("something went wrong");
return 42;
});
try {
int result = f.get(); // Re-throws the exception here!
} catch (const std::runtime_error& e) {
std::cerr << "Caught: " << e.what() << "\n";
}
Historical Context
Futures and promises have a rich history:
- 1976: Baker and Hewitt describe “futures” in the context of message-passing concurrency
- 1988: MultiLisp implements futures for parallel Lisp computation
- 2000s: Java gets
java.util.concurrent.Future, C# getsTask<T> - 2011: C++11 introduces
std::future,std::promise,std::async - 2017: C++17 adds
std::shared_futureimprovements - 2020: C++20 introduces coroutines (co_await works with futures)
Why C++ futures are “low-level”: Unlike JavaScript Promises or C# Tasks, C++ futures don’t have built-in .then() chaining (until C++23’s std::execution). This project teaches you to build that yourself.
Common Misconceptions
Misconception 1: “std::async always spawns a thread”
Reality: With std::launch::deferred, the computation runs lazily when you call get(). Default behavior is implementation-defined.
Misconception 2: “I can call get() multiple times on a future”
Reality: std::future::get() can only be called once. Use std::shared_future for multiple reads.
Misconception 3: “Futures are like callbacks” Reality: Futures are values representing eventual results. They can be stored, passed around, and composed. Callbacks are functions that get invoked.
Misconception 4: “If the producer thread dies, the future hangs forever”
Reality: If the promise is destroyed without setting a value, get() throws std::future_error with broken_promise.
Misconception 5: “I should always use async over manual promises” Reality: Manual promises give you control over thread placement and lifecycle. Essential for thread pools and task graphs.
Project Specification
What You Will Build
A Task Coordinator library that:
- Accepts task definitions with explicit dependencies
- Builds a Directed Acyclic Graph (DAG) of task execution order
- Executes tasks in parallel when dependencies allow
- Propagates results from upstream to downstream tasks
- Handles failures by cancelling dependent tasks
- Provides progress reporting and timing
Functional Requirements
FR1: Task Definition
- Tasks are defined as: ID + callable + list of dependency IDs
- Callables can be lambdas, function pointers, or
std::function - Tasks can return any copyable/movable type
- Tasks can accept results from their dependencies
FR2: Dependency Management
- Detect cycles during task graph construction
- Ensure tasks only run after all dependencies complete
- Support diamond dependencies (A -> B, A -> C, B -> D, C -> D)
- Allow tasks with no dependencies (entry points)
FR3: Parallel Execution
- Independent tasks run in parallel on a thread pool
- Number of worker threads is configurable
- Tasks that share dependencies correctly receive the same result
FR4: Result Propagation
- Task results are passed to dependent tasks
- Results are available via
shared_futurefor diamond patterns - Final results are retrievable after execution completes
FR5: Error Handling
- Exceptions in tasks are captured and propagated
- Downstream tasks are cancelled when upstream fails
- Clear error reporting indicates which task failed
FR6: Progress & Timing
- Report when each task starts and completes
- Track execution time per task
- Report total graph execution time
Non-Functional Requirements
- Thread Safety: All coordinator methods are thread-safe
- No Busy Waiting: Workers wait efficiently (condition variables)
- Clean Shutdown: Graceful termination with no resource leaks
- Deterministic Execution: Given same inputs, same logical execution order
- Modern C++: Uses C++17 or later features appropriately
Example Usage and Output
// Define tasks
TaskCoordinator coordinator(4); // 4 worker threads
// Task A: Download data (no dependencies)
coordinator.add_task("A", []() -> std::string {
std::this_thread::sleep_for(std::chrono::milliseconds(1200));
return download_file("data.json");
}, {});
// Task B: Parse JSON (depends on A)
coordinator.add_task("B", [](const std::string& json) -> Document {
std::this_thread::sleep_for(std::chrono::milliseconds(800));
return parse_json(json);
}, {"A"});
// Task C: Validate schema (depends on A)
coordinator.add_task("C", [](const std::string& json) -> bool {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
return validate_schema(json);
}, {"A"});
// Task D: Transform data (depends on B AND C)
coordinator.add_task("D", [](const Document& doc, bool valid) -> TransformedData {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return transform(doc);
}, {"B", "C"});
// Task E: Upload results (depends on D)
coordinator.add_task("E", [](const TransformedData& data) -> void {
std::this_thread::sleep_for(std::chrono::milliseconds(900));
upload(data);
}, {"D"});
// Execute the graph
auto report = coordinator.run();
Expected Output (Success Case):
$ ./task_coordinator
Submitting task graph:
Task A: Download data.json (no dependencies)
Task B: Parse JSON (depends on: A)
Task C: Validate schema (depends on: A)
Task D: Transform data (depends on: B, C)
Task E: Upload results (depends on: D)
Building execution plan...
Detected 5 tasks in 4 execution layers
Layer 0: [A]
Layer 1: [B, C] <- can run in parallel
Layer 2: [D]
Layer 3: [E]
[00:00.000] Executor: Starting Task A...
[00:01.203] Executor: Task A completed (1.203s) -> "data.json contents..."
[00:01.205] Executor: Starting Task B (dependency A satisfied)...
[00:01.206] Executor: Starting Task C (dependency A satisfied)...
[00:01.509] Executor: Task C completed (0.303s) -> true
[00:02.008] Executor: Task B completed (0.803s) -> Document{...}
[00:02.010] Executor: Starting Task D (dependencies B, C satisfied)...
[00:02.512] Executor: Task D completed (0.502s) -> TransformedData{...}
[00:02.514] Executor: Starting Task E (dependency D satisfied)...
[00:03.418] Executor: Task E completed (0.904s)
================================================================================
EXECUTION COMPLETE
================================================================================
Total tasks: 5
Completed: 5
Failed: 0
Cancelled: 0
Total time: 3.418s
Sequential time: 3.706s (if run serially)
Speedup: 1.08x (limited by critical path A->B->D->E)
Per-task timing:
Task A: 1.203s (34.2%)
Task B: 0.803s (22.9%)
Task C: 0.303s (8.6%)
Task D: 0.502s (14.3%)
Task E: 0.904s (25.7%)
Expected Output (Failure Case):
$ ./task_coordinator --inject-failure B
[00:00.000] Executor: Starting Task A...
[00:01.205] Executor: Task A completed (1.205s)
[00:01.207] Executor: Starting Task B (dependency A satisfied)...
[00:01.208] Executor: Starting Task C (dependency A satisfied)...
[00:01.512] Executor: Task C completed (0.304s)
[00:01.823] Executor: Task B FAILED: JSON parse error at line 42
[00:01.824] Executor: Cancelling Task D (dependency B failed)
[00:01.824] Executor: Cancelling Task E (dependency D cancelled)
================================================================================
EXECUTION FAILED
================================================================================
Total tasks: 5
Completed: 2 (A, C)
Failed: 1 (B)
Cancelled: 2 (D, E)
Failure chain:
B: JSON parse error at line 42
-> D: cancelled (dependency B failed)
-> E: cancelled (dependency D cancelled)
Partial results available for: A, C
Real World Outcome
This project directly maps to real-world systems:
| This Project | Real World Equivalent |
|---|---|
| Task | CI/CD job, ETL step, build target |
| Dependencies | Job dependencies in GitHub Actions, Make dependencies |
| Task Graph | DAG in Apache Airflow, Bazel build graph |
| Coordinator | Scheduler in Kubernetes, Celery task queue |
| Result passing | Data pipeline outputs |
After completing this project, you will understand:
- How build systems like Bazel schedule parallel builds
- How CI/CD systems like GitHub Actions handle job dependencies
- How data orchestrators like Airflow manage task execution
- The foundation for implementing
co_awaitand coroutine schedulers
Solution Architecture
High-Level Design
TASK COORDINATOR
┌─────────────────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────────┐ ┌──────────────────┐ ┌───────────────────────┐ │
│ │ Task Registry │───>│ Graph Builder │───>│ Execution Planner │ │
│ │ │ │ │ │ │ │
│ │ - ID -> Task │ │ - Cycle detect │ │ - Topological sort │ │
│ │ - Dependencies │ │ - Adjacency list │ │ - Parallelism layers │ │
│ └─────────────────┘ └──────────────────┘ └───────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Thread Pool Executor │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ Worker 4 │ ... │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │
│ │ │ │ │ │ │ │
│ │ └──────────────┴──────────────┴──────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────┐ │ │
│ │ │ Ready Queue │ <── Tasks with all deps met │ │
│ │ │ (thread-safe) │ │ │
│ │ └─────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Result Storage │ │
│ │ │ │
│ │ Task A ──> shared_future<string> │ │
│ │ Task B ──> shared_future<Document> │ │
│ │ Task C ──> shared_future<bool> │ │
│ │ ... │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
Task Execution Flow
Task Lifecycle
REGISTERED PENDING RUNNING COMPLETED/FAILED
│ │ │ │
│ Graph built │ Deps satisfied │ Execution │
▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ Task │ ────> │ Task │ ────> │ Task │ ────> │ Task │
│ waiting │ │ ready │ │ running │ │ done │
└─────────┘ └─────────┘ └─────────┘ └─────────┘
│ │
│ │
▼ ▼
Ready Queue Notify dependents
(picked by worker) (check if now ready)
Diamond Dependency Pattern
Task A (returns string)
/ \
/ \
shared_future<string> shared_future<string>
| |
▼ ▼
Task B Task C
(needs A's result) (needs A's result)
| |
future<Document> future<bool>
\ /
\ /
\ shared_future /
\ (both) /
\ /
▼ ▼
Task D
(needs B's Document AND C's bool)
Key Components
| Component | Responsibility | Key Data |
|---|---|---|
| Task | Wraps callable with dependencies | ID, callable, dep IDs, promise, state |
| TaskGraph | Manages task relationships | Adjacency list, in-degree counts |
| Executor | Runs tasks on thread pool | Worker threads, ready queue, mutex/CV |
| ResultStore | Holds completed task results | Map of ID -> shared_future |
| Scheduler | Decides which tasks run next | Topological ordering, layer assignment |
Data Structures
// Task states
enum class TaskState {
Pending, // Waiting for dependencies
Ready, // Dependencies satisfied, waiting for worker
Running, // Currently executing
Completed, // Finished successfully
Failed, // Threw exception
Cancelled // Upstream dependency failed
};
// Single task representation
template<typename Result>
struct Task {
std::string id;
std::function<Result(/* dependency results */)> callable;
std::vector<std::string> dependencies;
std::promise<Result> promise;
std::atomic<TaskState> state{TaskState::Pending};
std::chrono::steady_clock::time_point start_time;
std::chrono::steady_clock::time_point end_time;
};
// Type-erased task for heterogeneous storage
struct TaskBase {
std::string id;
std::vector<std::string> dependencies;
std::atomic<TaskState> state{TaskState::Pending};
virtual void execute(/* result store */) = 0;
virtual ~TaskBase() = default;
};
// Graph structure
struct TaskGraph {
std::unordered_map<std::string, std::unique_ptr<TaskBase>> tasks;
std::unordered_map<std::string, std::vector<std::string>> dependents; // reverse edges
std::unordered_map<std::string, size_t> in_degree; // remaining deps
};
// Thread pool with ready queue
struct Executor {
std::vector<std::thread> workers;
std::queue<TaskBase*> ready_queue;
std::mutex queue_mutex;
std::condition_variable queue_cv;
std::atomic<bool> shutdown{false};
std::atomic<size_t> tasks_remaining{0};
};
// Result storage (type-erased)
struct ResultStore {
std::unordered_map<std::string, std::any> results; // stores shared_future<T>
std::mutex mutex;
template<typename T>
std::shared_future<T> get(const std::string& id);
template<typename T>
void store(const std::string& id, std::shared_future<T> future);
};
Algorithm Overview
Task Graph Construction:
- Accept task definitions via
add_task() - Store task and dependency edges
- Build reverse adjacency list (who depends on whom)
- Calculate in-degrees (how many deps each task has)
Cycle Detection (before execution):
- Perform topological sort using Kahn’s algorithm
- If not all tasks are visited, a cycle exists
- Report the cycle to the user
Execution Scheduling:
- Find all tasks with in-degree 0 (no dependencies)
- Add them to the ready queue
- Workers pick tasks from ready queue
- When a task completes:
- Store result in ResultStore
- Decrement in-degree of all dependents
- If any dependent’s in-degree becomes 0, add to ready queue
- Repeat until all tasks complete or failure occurs
Failure Propagation:
- When a task fails, mark it as Failed
- Find all transitive dependents (BFS/DFS)
- Mark them as Cancelled
- Set their promises to a cancellation exception
- Continue with remaining independent tasks
Implementation Guide
Development Environment Setup
# Compiler with C++17 support
# GCC 8+, Clang 7+, or MSVC 2017+
# Ubuntu/Debian
sudo apt-get install g++ cmake build-essential
# macOS
xcode-select --install
brew install cmake
# Create project structure
mkdir -p promise-task-coordinator/{src,include,tests,examples}
cd promise-task-coordinator
# CMakeLists.txt
cat > CMakeLists.txt << 'EOF'
cmake_minimum_required(VERSION 3.16)
project(TaskCoordinator CXX)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Thread support
find_package(Threads REQUIRED)
# Enable sanitizers for development
add_compile_options(-fsanitize=thread)
add_link_options(-fsanitize=thread)
add_library(task_coordinator
src/task.cpp
src/executor.cpp
src/graph.cpp
src/coordinator.cpp
)
target_include_directories(task_coordinator PUBLIC include)
target_link_libraries(task_coordinator Threads::Threads)
add_executable(demo examples/demo.cpp)
target_link_libraries(demo task_coordinator)
enable_testing()
add_executable(tests tests/test_main.cpp)
target_link_libraries(tests task_coordinator)
add_test(NAME unit_tests COMMAND tests)
EOF
# Build
mkdir build && cd build
cmake .. && make
Project Structure
promise-task-coordinator/
├── include/
│ ├── task_coordinator.hpp # Main public interface
│ ├── task.hpp # Task types and states
│ ├── graph.hpp # DAG representation
│ ├── executor.hpp # Thread pool
│ └── result_store.hpp # Type-erased result storage
├── src/
│ ├── task.cpp
│ ├── graph.cpp
│ ├── executor.cpp
│ └── coordinator.cpp
├── tests/
│ ├── test_main.cpp
│ ├── test_basic.cpp # Single task, simple deps
│ ├── test_diamond.cpp # Diamond pattern
│ ├── test_failure.cpp # Exception propagation
│ └── test_parallel.cpp # Parallelism verification
├── examples/
│ ├── demo.cpp # Basic usage
│ ├── build_system.cpp # Simulated build graph
│ └── data_pipeline.cpp # ETL simulation
├── CMakeLists.txt
└── README.md
The Core Question You’re Answering
“How do you express ‘do A, then when A is done, do B with A’s result’ without callbacks or shared state?”
This question forces you to understand that futures are values representing eventual results. You can pass them around, store them, and compose them. The synchronization is implicit in the future/promise contract, not in explicit locks.
Concepts You Must Understand First
Before starting this project, verify you can answer these questions:
| Concept | Self-Check Question | Where to Learn |
|---|---|---|
| Move semantics | Why is std::future move-only? What happens if you copy it? |
C++ Concurrency in Action 4.2.1 |
| std::function | How does std::function store and invoke different callable types? |
cppreference, any C++ book |
| Exception safety | What does it mean for an operation to provide the strong exception guarantee? | Effective C++ Items 25-29 |
| Thread basics | What’s the difference between std::thread and what the thread is running? |
C++ Concurrency in Action 2.1 |
| Condition variables | Why must you hold a mutex when calling wait()? |
C++ Concurrency in Action 4.1 |
| Template type deduction | How does auto deduce types for lambda return values? |
Effective Modern C++ Item 1-4 |
Deep Concept Exploration
- The Future/Promise Contract
- What exactly happens when you call
promise::set_value()? - What memory synchronization does the future/promise pair provide?
- What happens if you call
set_value()twice? - C++ Concurrency in Action Chapter 4.2
- What exactly happens when you call
- std::async Launch Policies
- What’s the difference between
launch::asyncandlaunch::deferred? - What happens with the default policy?
- Why might
std::asyncNOT spawn a thread? - C++ Concurrency in Action Chapter 4.2.1
- What’s the difference between
- shared_future Semantics
- Why does
shared_future::get()returnconst T&? - What happens if multiple threads call
get()simultaneously? - When do you need shared_future vs future?
- C++ Concurrency in Action Chapter 4.2.2
- Why does
- Exception Propagation
- How does
std::current_exception()capture an exception? - What’s the difference between
set_exception()andset_exception_ptr()? - How do you re-throw a captured exception?
- C++ Concurrency in Action Chapter 4.2.4
- How does
Questions to Guide Your Design
Task Representation
-
How do you represent a task that can return any type? Do you use
std::any, templates, or type erasure? -
How do you store the callable? Is
std::functionsufficient, or do you need something more flexible? -
Where does the promise live? Who owns it, and how is it accessed by the executor?
-
How do you handle tasks with different numbers of dependencies? Does your callable signature support variadic dependency results?
Dependency Resolution
-
How do you detect cycles in the task graph? When do you check for cycles?
-
How do you know when all of a task’s dependencies are satisfied?
-
How do you efficiently track which tasks are ready to run?
-
For diamond dependencies, how do you ensure A’s result is computed once but read twice?
Execution
-
How do workers know when new tasks are ready? Polling or notification?
-
How do you handle a task that throws an exception? What happens to its dependents?
-
What’s the shutdown strategy? Do you wait for all tasks, or can you cancel mid-execution?
-
How do you handle the case where a task’s dependency was cancelled (not failed)?
Result Management
-
How do you retrieve results for tasks with different return types?
-
How do you handle tasks that return
void? -
When are task results cleaned up? Do you keep them forever or allow disposal?
Thinking Exercise
Before writing code, work through this scenario on paper:
Task Graph:
A (returns int: 10)
/ \
B C
| | \
D E F
\ /
G (needs D and E results)
B: takes A's result, returns A * 2
C: takes A's result, returns A + 5
D: takes B's result, returns B + 1
E: takes C's result, returns C * 2
F: takes C's result (fails with exception)
G: takes D and E results, returns D + E
Questions to Answer:
-
Draw the execution timeline assuming 2 worker threads. Which tasks run in parallel?
-
What
shared_futureobjects are needed? Trace the ownership of A’s result. -
If F fails, what happens to G? (Trick question: think carefully about the dependency graph)
-
What is the final value of G if all tasks succeed?
- Write out the sequence of:
promise::set_value()callsfuture::get()callsshared_futurecopies
- If C throws an exception instead of F:
- Which tasks are cancelled?
- Which tasks complete normally?
- What does
G.get()throw?
Hints in Layers
Reveal hints progressively as needed:
Hint 1: Getting Started - Core Abstractions
Start with the minimal task representation:
// Start simple: tasks with no input dependencies
struct SimpleTask {
std::string id;
std::function<void()> work;
std::vector<std::string> deps;
std::promise<void> done;
};
// Get this working first:
// 1. Add tasks to a map
// 2. Find tasks with no deps (in_degree == 0)
// 3. Run those tasks on the thread pool
// 4. When done, signal dependents
Key insight: Get a working system with void tasks first. Add result passing later.
Hint 2: Result Passing - Type Erasure Strategy
The challenge: tasks have different result types. Solution: type erase with std::any:
class ResultStore {
std::unordered_map<std::string, std::any> results_;
std::mutex mutex_;
public:
template<typename T>
void store(const std::string& id, std::shared_future<T> future) {
std::lock_guard lock(mutex_);
results_[id] = future;
}
template<typename T>
std::shared_future<T> get(const std::string& id) {
std::lock_guard lock(mutex_);
return std::any_cast<std::shared_future<T>>(results_.at(id));
}
};
The task’s execute method retrieves typed results:
// Inside Task<R, Deps...>::execute()
auto dep_results = std::make_tuple(
result_store.get<Dep1Type>("dep1").get(),
result_store.get<Dep2Type>("dep2").get()
);
auto result = std::apply(callable_, dep_results);
promise_.set_value(std::move(result));
Hint 3: Scheduling - In-Degree Tracking
Use in-degree counting for efficient ready detection:
class Scheduler {
std::unordered_map<std::string, size_t> in_degree_; // remaining deps
std::unordered_map<std::string, std::vector<std::string>> dependents_;
void task_completed(const std::string& id) {
for (const auto& dep_id : dependents_[id]) {
if (--in_degree_[dep_id] == 0) {
mark_ready(dep_id);
}
}
}
std::vector<std::string> initial_ready_tasks() {
std::vector<std::string> ready;
for (const auto& [id, degree] : in_degree_) {
if (degree == 0) ready.push_back(id);
}
return ready;
}
};
The ready queue is populated from:
- Initially: all tasks with in_degree == 0
- Dynamically: tasks whose last dependency just completed
Hint 4: Exception Handling - Cascading Cancellation
When a task fails, propagate cancellation to all transitive dependents:
void handle_task_failure(const std::string& failed_id, std::exception_ptr ex) {
// Mark the failed task
tasks_[failed_id]->state = TaskState::Failed;
tasks_[failed_id]->set_exception(ex);
// BFS to find all transitive dependents
std::queue<std::string> to_cancel;
for (const auto& dep_id : dependents_[failed_id]) {
to_cancel.push(dep_id);
}
while (!to_cancel.empty()) {
auto id = to_cancel.front();
to_cancel.pop();
if (tasks_[id]->state == TaskState::Pending ||
tasks_[id]->state == TaskState::Ready) {
tasks_[id]->state = TaskState::Cancelled;
tasks_[id]->set_exception(
std::make_exception_ptr(
TaskCancelledException(failed_id)));
for (const auto& next_id : dependents_[id]) {
to_cancel.push(next_id);
}
}
}
}
Important: A cancelled task’s promise must still be fulfilled (with an exception) so that get() doesn’t hang.
The Interview Questions They’ll Ask
After completing this project, you will be ready for these common interview questions:
Question 1: “Explain std::future and std::promise. How do they differ from callbacks?”
What they want: Understanding that future/promise is a one-shot channel. The promise is the write end (producer), future is the read end (consumer). Unlike callbacks, futures are values that can be stored, passed around, and composed.
Strong answer elements:
- Future represents an eventual value, not an invocation
- Promise can only be set once (one-shot)
- Automatic exception propagation across threads
get()blocks until value is available- Comparison: callbacks invert control flow, futures preserve it
Question 2: “What’s the difference between std::future and std::shared_future?”
What they want: Understanding of move semantics and multiple consumer patterns.
Strong answer elements:
std::futureis move-only;get()can only be called oncestd::shared_futureis copyable;get()returns const reference- Use shared_future when multiple threads need the same result
- Common pattern:
future.share()to convert
Question 3: “How would you implement a task scheduler with dependencies?”
What they want: System design thinking, understanding of DAG execution.
Strong answer elements:
- Represent as DAG with tasks as nodes
- Topological sort for execution order
- In-degree tracking for ready detection
- Thread pool picks from ready queue
- Completion triggers dependent check
Question 4: “What happens if a thread throws an exception and has a promise?”
What they want: Exception safety in concurrent code.
Strong answer elements:
- Must catch and call
promise::set_exception() - Use
std::current_exception()to capture - If promise destroyed without setting,
get()throwsbroken_promise - Wrapper pattern to ensure exception capture
Question 5: “How do you avoid deadlocks with futures?”
What they want: Understanding of blocking in get() and circular waits.
Strong answer elements:
- Never call
get()on a future from the thread that should fulfill it - Avoid circular dependencies in task graphs
- Be careful with thread pool starvation (all workers blocked on
get()) - Use
wait_for()with timeout for defensive programming - Cycle detection in dependency graphs
Question 6: “Compare std::async with using a thread pool”
What they want: Understanding of when to use each approach.
Strong answer elements:
std::asynccreates threads per call (potentially)- Thread pool amortizes thread creation cost
std::asyncwithdeferredruns lazily- Thread pool gives control over parallelism level
- For task graphs, thread pool is more appropriate
Books That Will Help
| Topic | Book | Chapter |
|---|---|---|
| Futures and promises fundamentals | C++ Concurrency in Action (2nd Ed) - Anthony Williams | Chapter 4.2 |
| std::async and launch policies | C++ Concurrency in Action (2nd Ed) - Anthony Williams | Chapter 4.2.1 |
| std::shared_future | C++ Concurrency in Action (2nd Ed) - Anthony Williams | Chapter 4.2.2 |
| Exception handling in futures | C++ Concurrency in Action (2nd Ed) - Anthony Williams | Chapter 4.2.4 |
| Designing concurrent data structures | C++ Concurrency in Action (2nd Ed) - Anthony Williams | Chapter 6 |
| Thread pool design | C++ Concurrency in Action (2nd Ed) - Anthony Williams | Chapter 9.1 |
| DAG scheduling algorithms | Introduction to Algorithms (CLRS) | Chapter 22 (Graph Algorithms) |
| Work scheduling in parallel systems | The Art of Multiprocessor Programming - Herlihy & Shavit | Chapter 16 |
Implementation Phases
Phase 1: Foundation (Days 1-3)
Goals:
- Basic task representation with void return type
- Task graph construction and cycle detection
- Simple sequential execution
Tasks:
- Implement
Taskclass with ID, callable, dependencies - Implement
TaskGraphwith add, cycle detection - Implement topological sort
- Test: add tasks, verify ordering
Checkpoint: Can build a graph of void tasks and detect cycles.
Phase 2: Thread Pool Executor (Days 4-6)
Goals:
- Worker threads pick tasks from ready queue
- Proper shutdown and synchronization
- Basic parallelism working
Tasks:
- Implement ready queue with mutex/condition variable
- Implement worker thread loop
- Implement in-degree tracking and ready notification
- Test: verify tasks run in parallel
Checkpoint: Tasks run on multiple threads in dependency order.
Phase 3: Result Passing (Days 7-9)
Goals:
- Tasks return typed results
- Results passed to dependent tasks
- shared_future for diamond dependencies
Tasks:
- Add templated Task with result type
- Implement ResultStore with type erasure
- Add dependency result retrieval
- Handle shared_future for multiple consumers
- Test: diamond dependency pattern
Checkpoint: Results flow through the graph correctly.
Phase 4: Exception Handling (Days 10-12)
Goals:
- Exceptions captured and propagated
- Downstream tasks cancelled on failure
- Clean error reporting
Tasks:
- Wrap task execution with try/catch
- Call
set_exception()on failure - Implement cancellation cascade
- Generate failure report
- Test: various failure scenarios
Checkpoint: Failures propagate correctly, partial results available.
Phase 5: Polish & Extensions (Days 13-14)
Goals:
- Progress reporting
- Timing statistics
- Clean API
Tasks:
- Add timing instrumentation
- Implement progress callbacks
- Clean up public API
- Documentation
- Comprehensive testing
Checkpoint: Production-ready library with examples.
Key Implementation Decisions
| Decision | Options | Recommendation | Rationale |
|---|---|---|---|
| Task type erasure | std::any vs inheritance |
std::any |
Simpler, modern C++ |
| Result storage | Promises vs map of results | Map of shared_futures | Allows multiple reads |
| Cycle detection | DFS vs Kahn’s algorithm | Kahn’s | Also gives topological order |
| Thread pool | Fixed vs dynamic size | Fixed | Predictable, simpler |
| Ready queue | std::queue vs priority_queue | std::queue | FIFO is fair |
Testing Strategy
Test Categories
| Category | Purpose | Example |
|---|---|---|
| Unit Tests | Test components in isolation | Future/promise pair works |
| Integration Tests | Test component interaction | Task graph + executor |
| Stress Tests | Verify under load | 1000 tasks, 16 threads |
| Failure Tests | Verify error handling | Exception propagation |
| Edge Cases | Handle unusual scenarios | Single task, empty graph |
Critical Test Cases
- Single Task (No Dependencies)
coordinator.add_task("A", []{ return 42; }, {}); auto result = coordinator.run(); EXPECT_EQ(result.get<int>("A"), 42); - Linear Chain (A -> B -> C)
coordinator.add_task("A", []{ return 1; }, {}); coordinator.add_task("B", [](int a){ return a + 1; }, {"A"}); coordinator.add_task("C", [](int b){ return b + 1; }, {"B"}); // C should equal 3 - Diamond Pattern (A -> B,C -> D)
coordinator.add_task("A", []{ return 10; }, {}); coordinator.add_task("B", [](int a){ return a * 2; }, {"A"}); coordinator.add_task("C", [](int a){ return a + 5; }, {"A"}); coordinator.add_task("D", [](int b, int c){ return b + c; }, {"B", "C"}); // A runs once, B and C get shared_future, D = 20 + 15 = 35 - Failure Propagation
coordinator.add_task("A", []{ return 1; }, {}); coordinator.add_task("B", [](int){ throw std::runtime_error("fail"); return 0; }, {"A"}); coordinator.add_task("C", [](int){ return 3; }, {"B"}); // Should be cancelled - Cycle Detection
coordinator.add_task("A", []{}, {"B"}); coordinator.add_task("B", []{}, {"A"}); EXPECT_THROW(coordinator.run(), CycleDetectedException); - Parallel Execution Verification
// A, B, C have no dependencies - should run in parallel std::atomic<int> concurrent_count{0}; int max_concurrent = 0; auto task = [&]{ int c = ++concurrent_count; max_concurrent = std::max(max_concurrent, c); std::this_thread::sleep_for(100ms); --concurrent_count; }; coordinator.add_task("A", task, {}); coordinator.add_task("B", task, {}); coordinator.add_task("C", task, {}); coordinator.run(); EXPECT_GE(max_concurrent, 2); // At least 2 ran in parallel
Common Pitfalls & Debugging
Frequent Mistakes
| Pitfall | Symptom | Solution |
|---|---|---|
Calling get() twice on future |
Exception: “future already retrieved” | Use shared_future or store result |
| Promise destroyed without setting | Exception: “broken promise” | Always set value or exception |
| Circular dependencies | Infinite wait or exception | Detect cycles before execution |
| Thread pool starvation | Deadlock when all workers blocked | Limit blocking gets, use timeouts |
| Forgetting to move promise | Compile error (deleted copy) | Use std::move() for promise capture |
| Race condition in ready check | Tasks missed or run twice | Protect in_degree with mutex |
Debugging Strategies
Deadlock diagnosis:
# Run with timeout
timeout 10 ./task_coordinator || echo "Possible deadlock"
# Use GDB to see thread states
gdb -ex "thread apply all bt" ./task_coordinator
Thread Sanitizer:
g++ -fsanitize=thread -o coordinator coordinator.cpp
./coordinator # Reports data races
Logging strategy:
#define LOG_TASK(id, msg) \
std::cerr << "[" << std::this_thread::get_id() << "] " \
<< id << ": " << msg << std::endl
// In task execution:
LOG_TASK(id, "starting");
LOG_TASK(id, "completed with value: " + std::to_string(result));
Verification:
- Print the topological order before execution
- Log each dependency satisfaction
- Print final states of all tasks
Extensions & Challenges
Beginner Extensions
- Task timeout: Cancel tasks that exceed a time limit
- Priority levels: High-priority tasks run first when ready
- Progress callback: Notify external code as tasks complete
Intermediate Extensions
- Continuation chains: Add
.then()method to futures - Task groups: Cancel entire groups of related tasks
- Retry logic: Automatically retry failed tasks with backoff
- Resource limits: Tasks can declare resource requirements
Advanced Extensions
- Distributed execution: Tasks can run on remote nodes
- Persistent task graphs: Serialize/deserialize task graphs
- Incremental execution: Re-run only changed tasks (like Make)
- Coroutine integration: Make tasks
co_await-able
Research Extensions
- Optimal scheduling: Minimize total execution time given task durations
- Fault tolerance: Continue despite worker failures
- Speculative execution: Start likely-needed tasks early
Real-World Connections
Industry Systems Using These Patterns
| System | Pattern Used | Scale |
|---|---|---|
| Bazel/Buck | Task graph, incremental execution | Billions of artifacts at Google |
| Apache Airflow | DAG scheduling, dependency tracking | ETL pipelines |
| GitHub Actions | Job dependencies, parallel execution | CI/CD |
| Kubernetes Jobs | Dependency ordering, failure handling | Container orchestration |
| Celery | Task queue, result backend | Python async tasks |
| TBB/OpenMP | Task-based parallelism | HPC, games |
How Production Systems Differ
Your implementation is pedagogical. Production systems add:
- Persistence: Tasks survive process restarts
- Distribution: Tasks run across machines
- Observability: Metrics, tracing, logging
- Backpressure: Handle more tasks than workers
- Versioning: Handle task definition changes
- Security: Authentication, authorization
Career Applications
This project prepares you for:
- Build system engineering: Bazel, Buck, Gradle internals
- Data platform engineering: Airflow, Prefect, Dagster
- Game engine development: Job systems, async loading
- High-frequency trading: Task scheduling, latency optimization
- Compiler engineering: Parallel compilation, incremental builds
Resources
Essential Reading
- C++ Concurrency in Action, 2nd Edition (Anthony Williams) - Chapter 4
- cppreference.com -
std::future,std::promise,std::asyncpages - Herb Sutter’s “Effective Concurrency” series on DDJ
Video Resources
- CppCon 2014: “C++ Futures and How to Use Them” - Scott Meyers
- CppCon 2016: “The Continuing Future of Concurrency in C++” - Anthony Williams
- CppCon 2019: “Back to Basics: Concurrency” - Arthur O’Dwyer
Code References
- folly::Future (Facebook’s continuation-based futures)
- Intel TBB task_group
- HPX (High Performance ParalleX) futures
- Boost.Fiber (cooperative scheduling)
Related Projects in This Series
- Previous: P01 - Multi-Threaded Log Aggregator - Threading fundamentals, mutexes, condition variables
- Next: P03 - Work-Stealing Thread Pool - Advanced thread pool design
- Related: P10 - Async Task Framework - Full coroutine-based async system
Self-Assessment Checklist
Before considering this project complete, verify:
Understanding
- I can explain why futures are better than callbacks for async coordination
- I understand the one-shot nature of promise/future pairs
- I can explain when to use
shared_futurevsfuture - I understand how exceptions propagate through futures
- I can identify potential deadlocks in task graphs
Implementation
- Task graph correctly rejects cycles
- Independent tasks run in parallel
- Diamond dependencies work (shared result)
- Exceptions propagate to downstream tasks
- Cancelled tasks don’t run
- Thread pool shuts down cleanly
Testing
- All critical test cases pass
- Thread sanitizer reports no races
- Performance scales with thread count
- Edge cases handled (empty graph, single task)
Growth
- I can now explain task-based parallelism in interviews
- I understand how build systems schedule work
- I’m ready to learn coroutines (C++20)
- I can read production task scheduler code
Submission / Completion Criteria
Minimum Viable Completion:
- Tasks with dependencies execute in correct order
- Results pass from producers to consumers
- Basic error handling (exceptions caught)
- Works with at least 2 worker threads
Full Completion:
- Diamond dependencies work with
shared_future - Cascading cancellation on failure
- Timing and progress reporting
- Comprehensive test suite
- Clean public API
Excellence (Going Above & Beyond):
.then()continuation support- Task priorities
- Timeout and retry logic
- Performance benchmarks
- Comparison with production systems (Folly, TBB)
This guide was expanded from LEARN_CPP_CONCURRENCY_AND_PARALLELISM.md. For the complete learning path, see the project index.