Project 10: Async Task Framework


Quick Reference

Attribute Value
Project Number 10 of 17
Difficulty Master
Time Estimate 4-6 weeks
Language C++ (C++20 required)
Alternative Languages Rust (async/await), JavaScript (for comparison)
Prerequisites Project 9 (Coroutine Generator) completed, Project 3 (Thread Pool) helpful
Knowledge Area C++20 Coroutines / Async Runtime
Key Concepts Awaiter protocol, promise_type, task scheduling, continuation passing
Main Book “C++ Concurrency in Action, Second Edition” by Anthony Williams
Coolness Level Level 5: Pure Magic
Business Potential Open Core Infrastructure

Learning Objectives

By completing this project, you will:

  1. Master the Awaiter Protocol: Understand await_ready(), await_suspend(), and await_resume() at a deep level
  2. **Build a Task Abstraction**: Create a general-purpose awaitable type that represents deferred computation
  3. Implement a Multi-threaded Runtime: Design a scheduler that runs coroutines across worker threads
  4. Handle Complex Suspension: Manage coroutines that wait for other coroutines
  5. Build Composable Primitives: Create when_all, spawn, sleep, and block_on combinators
  6. Master Exception Propagation: Store and rethrow exceptions across suspension points
  7. Understand Continuation Passing: See how co_await transforms code into callback chains

Theoretical Foundation

Core Concepts

The Awaiter Protocol: The Heart of co_await

When you write co_await expr, the compiler transforms it into calls to three methods on an “awaiter” object:

                    co_await expression
                           |
                           v
              +------------------------+
              | Get awaiter from expr  |
              | (via operator co_await |
              | or directly if awaiter)|
              +------------------------+
                           |
                           v
              +------------------------+
              | awaiter.await_ready()  |
              | Returns bool           |
              +------------------------+
                     |           |
                true |           | false
                     v           v
              +---------+  +---------------------------+
              | Continue|  | awaiter.await_suspend(h)  |
              | without |  | h = current coroutine     |
              | suspend |  +---------------------------+
              +---------+         |
                     |            | (suspend happens here)
                     |            |
                     v            v
              +----------------------------------+
              | awaiter.await_resume()          |
              | Returns the result of co_await  |
              +----------------------------------+

Key insight: await_suspend receives the handle of the waiting coroutine. This is how you chain coroutines together - you store this handle and resume it when your work completes.

The Promise Type: Coroutine State Machine

Every coroutine has an associated promise_type that controls its behavior:

+-------------------------------------------------------------------+
|                    COROUTINE FRAME (heap-allocated)               |
+-------------------------------------------------------------------+
|  +--------------------+  +---------------------+  +--------------+|
|  | Promise Object     |  | Parameters (copies) |  | Local Vars   ||
|  |                    |  |                     |  |              ||
|  | - result (T)       |  | - captured args     |  | - your vars  ||
|  | - exception_ptr    |  |                     |  |              ||
|  | - waiting_handle   |  |                     |  |              ||
|  +--------------------+  +---------------------+  +--------------+|
|                                                                   |
|  +-------------------------------------------------------------+ |
|  |                    SUSPENSION POINT STATE                    | |
|  |  - Instruction pointer (where to resume)                     | |
|  |  - Register state                                            | |
|  +-------------------------------------------------------------+ |
+-------------------------------------------------------------------+

The promise_type customization points:

Method When Called Purpose
get_return_object() At coroutine creation Returns the Task to caller
initial_suspend() Before coroutine body Return suspend_always for lazy start
final_suspend() After coroutine completes Resume waiters, cleanup
return_value(T) On co_return val Store the result
unhandled_exception() On uncaught exception Store for later rethrow

Continuation Passing Style (CPS) Under the Hood

C++20 coroutines are syntactic sugar for continuation-passing style. Consider:

Task<int> compute() {
    auto x = co_await get_value();    // Point A
    auto y = co_await transform(x);    // Point B
    co_return x + y;                   // Point C
}

This is conceptually equivalent to:

void compute_cps(Continuation<int> final_cont) {
    get_value_cps([final_cont](int x) {           // After Point A
        transform_cps(x, [x, final_cont](int y) {  // After Point B
            final_cont(x + y);                      // Point C
        });
    });
}

The coroutine machinery manages these continuations for you.

The Scheduler: Orchestrating Coroutines

An async runtime needs to manage:

+-----------------------------------------------------------------------+
|                           RUNTIME                                      |
+-----------------------------------------------------------------------+
|                                                                        |
|  +------------------+     +------------------+                         |
|  |   Ready Queue    |     |   Timer Heap     |                         |
|  |   (runnable      |     |   (sleeping      |                         |
|  |    coroutines)   |     |    coroutines)   |                         |
|  +--------+---------+     +--------+---------+                         |
|           |                        |                                   |
|           v                        v                                   |
|  +------------------+     +------------------+                         |
|  | Worker Thread 1  |     |   Timer Thread   |                         |
|  | - pop & resume   |     | - wake expired   |                         |
|  +------------------+     +------------------+                         |
|           |                                                            |
|  +------------------+                                                  |
|  | Worker Thread 2  |                                                  |
|  | - pop & resume   |                                                  |
|  +------------------+                                                  |
|           |                                                            |
|  +------------------+                                                  |
|  | Worker Thread N  |                                                  |
|  | - pop & resume   |                                                  |
|  +------------------+                                                  |
|                                                                        |
+-----------------------------------------------------------------------+

Why This Matters

Industry Context: Async programming is the foundation of high-performance servers, game engines, and any I/O-bound application. Tokio (Rust), asyncio (Python), and libuv (Node.js) all implement similar concepts. By building your own, you’ll understand what these frameworks do internally.

Career Impact: Understanding async runtimes is essential for:

  • Writing efficient network servers
  • Building game engines and real-time applications
  • Contributing to high-performance systems
  • Debugging subtle async bugs in production

Performance Numbers:

  • Tokio handles 500k+ concurrent connections on a single machine
  • Async I/O reduces context switch overhead by 10-100x vs threads
  • Coroutine switching: ~10-50 nanoseconds (vs ~1000ns for thread switch)

Historical Context

Evolution of Async Programming:

  1. 1960s-1980s: Callback-based event loops (select, poll)
  2. 1990s: Thread-per-connection (simple but doesn’t scale)
  3. 2000s: Event-driven callbacks (Node.js, Nginx)
  4. 2010s: Async/await syntax (C#, Python, JavaScript, Rust)
  5. 2020: C++20 brings coroutines to C++

C++20 coroutines are unique: they provide the machinery (compiler transforms, handle types) but not the runtime (scheduler, I/O). This is intentional - it allows library authors to build custom runtimes optimized for their use case.

Common Misconceptions

Misconception Reality
“Coroutines are like threads” Coroutines are cooperative (explicit yield), threads are preemptive (OS schedules)
“co_await blocks the thread” co_await suspends the coroutine, the thread moves on to other work
“Coroutines are always faster” Coroutines reduce context switches but add allocation overhead
“C++ coroutines are complete” C++ provides primitives only; you must build or import the runtime
“Stackless means no stack” Stackless coroutines use the thread stack when running, heap when suspended

Project Specification

What You Will Build

An async task framework with the following components:

  1. **Task**: An awaitable type representing a deferred computation
  2. Runtime: A multi-threaded scheduler that runs tasks to completion
  3. Primitives: sleep(), spawn(), when_all(), block_on()

User-Facing API

// User code - looks like modern async/await!
Task<std::string> fetch_data(std::string url) {
    auto connection = co_await async_connect(url);
    auto response = co_await connection.get("/api/data");
    co_return response.body();
}

Task<int> compute_result(int x) {
    co_await sleep(100ms);  // Non-blocking!
    co_return x * 2;
}

Task<void> main_task() {
    // Run tasks concurrently
    auto [data, result] = co_await when_all(
        fetch_data("https://api.example.com"),
        compute_result(21)
    );

    std::cout << "Data: " << data << ", Result: " << result << "\n";
}

int main() {
    // Run the async runtime
    Runtime runtime(4);  // 4 worker threads
    runtime.block_on(main_task());
}

Expected Output

$ ./async_demo
[Runtime] Starting with 4 worker threads
[Worker-1] Running main_task
[Worker-2] Running fetch_data (spawned)
[Worker-3] Running compute_result (spawned)
[Worker-3] compute_result sleeping...
[Worker-2] fetch_data: connected to api.example.com
[Worker-3] compute_result done: 42
[Worker-2] fetch_data done: {"status": "ok"}
[Worker-1] main_task resuming with results
Data: {"status": "ok"}, Result: 42
[Runtime] All tasks complete

Core Features to Implement

Feature Description Difficulty
Task<T> Basic awaitable task with result storage High
Task<void> Void specialization (no result) Medium
sleep(duration) Non-blocking timer-based suspension Medium
spawn(task) Launch task without waiting Medium
when_all(tasks...) Wait for multiple tasks concurrently High
block_on(task) Run until task completes (entry point) High
Exception propagation Store and rethrow across suspensions High
Multi-threaded runtime Run tasks on thread pool High

Solution Architecture

Task State Machine

+-----------------------------------------------------------------------+
|                         TASK LIFECYCLE                                 |
+-----------------------------------------------------------------------+
|                                                                        |
|                        +------------------+                            |
|                        |    CREATED       |                            |
|                        | (Task returned,  |                            |
|                        |  not started)    |                            |
|                        +--------+---------+                            |
|                                 |                                      |
|                                 | co_await task                        |
|                                 | (or block_on)                        |
|                                 v                                      |
|                        +------------------+                            |
|                        |    RUNNING       |                            |
|                        | (executing on    |                            |
|                        |  worker thread)  |                            |
|                        +--------+---------+                            |
|                                 |                                      |
|        +------------------------+------------------------+             |
|        |                        |                        |             |
|        | co_await               | co_await               | co_return   |
|        | (ready)                | (not ready)            | or exception|
|        v                        v                        v             |
| +-------------+          +------------------+     +------------------+ |
| | Continue    |          |   SUSPENDED      |     |   COMPLETED      | |
| | (no suspend)|          | (waiting for     |     | (result or       | |
| +-------------+          |  another task)   |     |  exception ready)| |
|        |                 +--------+---------+     +--------+---------+ |
|        |                          |                        |           |
|        +------------------------->+                        |           |
|                                   |                        |           |
|                        awaited task completes              |           |
|                                   |                        |           |
|                                   v                        |           |
|                        +------------------+                |           |
|                        |    RESUMED       |                |           |
|                        | (back in queue)  |<---------------+           |
|                        +------------------+  (resume waiter)           |
|                                                                        |
+-----------------------------------------------------------------------+

Runtime Architecture

+-----------------------------------------------------------------------+
|                           RUNTIME INTERNALS                            |
+-----------------------------------------------------------------------+
|                                                                        |
|  +-----------------------+                                             |
|  |    block_on(task)     | <-- Entry point from main()                 |
|  +-----------+-----------+                                             |
|              |                                                         |
|              | 1. Create workers                                       |
|              | 2. Schedule initial task                                |
|              | 3. Wait for completion                                  |
|              v                                                         |
|  +-----------------------------------------------------------------------+
|  |                      SHARED STATE                                    |
|  | +-------------------+  +-------------------+  +-------------------+  |
|  | |    Ready Queue    |  |   Timer Heap      |  |  Completion Flag  |  |
|  | | (thread-safe)     |  | (wake-up times)   |  | (for block_on)    |  |
|  | |                   |  |                   |  |                   |  |
|  | | [task1] [task2]   |  | [(time, handle)]  |  | atomic<bool>      |  |
|  | +-------------------+  +-------------------+  +-------------------+  |
|  +-----------------------------------------------------------------------+
|              |                     |                      |            |
|              v                     v                      v            |
|  +-----------------------------------------------------------------------+
|  |                        WORKER THREADS                                |
|  |                                                                       |
|  |  Worker 1              Worker 2              Worker N                |
|  |  +----------+          +----------+          +----------+            |
|  |  | loop {   |          | loop {   |          | loop {   |            |
|  |  |   pop()  |          |   pop()  |          |   pop()  |            |
|  |  |   resume |          |   resume |          |   resume |            |
|  |  | }        |          | }        |          | }        |            |
|  |  +----------+          +----------+          +----------+            |
|  |                                                                       |
|  +-----------------------------------------------------------------------+
|              |                                                          |
|              v                                                          |
|  +-----------------------------------------------------------------------+
|  |                      TIMER THREAD                                    |
|  |  +----------------------------------------------------------------+  |
|  |  | loop {                                                         |  |
|  |  |   wait_until(next_timer)                                       |  |
|  |  |   for expired timers: push to ready queue                      |  |
|  |  | }                                                              |  |
|  |  +----------------------------------------------------------------+  |
|  +-----------------------------------------------------------------------+
|                                                                        |
+-----------------------------------------------------------------------+

Key Components

1. Task - The Awaitable Wrapper

template<typename T>
class Task {
public:
    struct promise_type {
        std::optional<T> result;
        std::exception_ptr exception;
        std::coroutine_handle<> waiting;  // Who's waiting for us?

        Task get_return_object();
        std::suspend_always initial_suspend() noexcept;  // Lazy start
        auto final_suspend() noexcept;  // Resume waiter
        void return_value(T value);
        void unhandled_exception();
    };

    // Make Task awaitable
    auto operator co_await();

private:
    std::coroutine_handle<promise_type> handle_;
};

2. Runtime - The Scheduler

class Runtime {
public:
    explicit Runtime(size_t num_threads);
    ~Runtime();

    template<typename T>
    T block_on(Task<T> task);  // Run task, block until complete

    void schedule(std::coroutine_handle<> handle);  // Add to ready queue
    void schedule_after(std::coroutine_handle<> handle,
                       std::chrono::milliseconds delay);

private:
    std::vector<std::thread> workers_;
    ThreadSafeQueue<std::coroutine_handle<>> ready_queue_;
    TimerHeap timer_heap_;
    std::atomic<bool> shutdown_;
};

3. Awaiters - Suspension Points

// Sleep awaiter
struct SleepAwaiter {
    Runtime& runtime;
    std::chrono::milliseconds duration;

    bool await_ready() { return duration <= 0ms; }

    void await_suspend(std::coroutine_handle<> h) {
        runtime.schedule_after(h, duration);
    }

    void await_resume() {}
};

// Task awaiter (for co_await another_task)
template<typename T>
struct TaskAwaiter {
    std::coroutine_handle<typename Task<T>::promise_type> task;

    bool await_ready() { return task.done(); }

    std::coroutine_handle<> await_suspend(std::coroutine_handle<> waiting) {
        task.promise().waiting = waiting;
        return task;  // Start/resume the awaited task
    }

    T await_resume() {
        if (task.promise().exception)
            std::rethrow_exception(task.promise().exception);
        return std::move(*task.promise().result);
    }
};

Data Structures

Structure Implementation Purpose
Ready Queue Lock-free MPMC or mutex-protected deque Store runnable coroutines
Timer Heap Min-heap by wake time Track sleeping coroutines
Task Handle std::coroutine_handle<promise_type> Reference to coroutine frame
Promise Custom struct in Task Store result, exception, waiting handle

Implementation Guide

Phase 1: Basic Task (Week 1)

Goal: Create a Task that can be awaited and returns a value.

Step 1.1: Define the Promise Type

template<typename T>
class Task {
public:
    struct promise_type {
        std::optional<T> result;
        std::exception_ptr exception;
        std::coroutine_handle<> waiting;

        Task get_return_object() {
            return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
        }

        // Lazy: don't start until awaited
        std::suspend_always initial_suspend() noexcept { return {}; }

        // On completion, resume whoever was waiting
        auto final_suspend() noexcept {
            struct FinalAwaiter {
                bool await_ready() noexcept { return false; }

                std::coroutine_handle<> await_suspend(
                    std::coroutine_handle<promise_type> me) noexcept
                {
                    if (me.promise().waiting)
                        return me.promise().waiting;
                    return std::noop_coroutine();
                }

                void await_resume() noexcept {}
            };
            return FinalAwaiter{};
        }

        void return_value(T value) {
            result = std::move(value);
        }

        void unhandled_exception() {
            exception = std::current_exception();
        }
    };

    // ... (handle management, awaiter) ...
};

Key insight: final_suspend is where we resume the waiting coroutine. By returning the waiting handle from await_suspend, we transfer control directly to the waiter without going through the scheduler.

Step 1.2: Make Task Awaitable

template<typename T>
class Task {
    // ... promise_type above ...

public:
    auto operator co_await() {
        struct Awaiter {
            std::coroutine_handle<promise_type> task;

            bool await_ready() {
                return task.done();
            }

            std::coroutine_handle<> await_suspend(std::coroutine_handle<> waiting) {
                task.promise().waiting = waiting;
                return task;  // Resume the task we're waiting for
            }

            T await_resume() {
                if (task.promise().exception)
                    std::rethrow_exception(task.promise().exception);
                return std::move(*task.promise().result);
            }
        };
        return Awaiter{handle_};
    }

private:
    std::coroutine_handle<promise_type> handle_;

    explicit Task(std::coroutine_handle<promise_type> h) : handle_(h) {}

public:
    ~Task() {
        if (handle_) handle_.destroy();
    }

    // Move-only
    Task(Task&& other) noexcept : handle_(std::exchange(other.handle_, {})) {}
    Task& operator=(Task&& other) noexcept {
        if (this != &other) {
            if (handle_) handle_.destroy();
            handle_ = std::exchange(other.handle_, {});
        }
        return *this;
    }

    Task(const Task&) = delete;
    Task& operator=(const Task&) = delete;
};

Step 1.3: Test Basic Chaining

Task<int> add(int a, int b) {
    co_return a + b;
}

Task<int> multiply_by_two(int x) {
    auto result = co_await add(x, x);
    co_return result;
}

// In main (you'll need block_on later, for now use .handle_.resume())
auto task = multiply_by_two(21);
// ... manually resume to test ...

Phase 2: Single-Threaded Runtime (Week 2)

Goal: Build a scheduler that runs tasks on a single thread.

Step 2.1: Ready Queue

class Runtime {
public:
    void schedule(std::coroutine_handle<> handle) {
        ready_queue_.push(handle);
    }

    void run_until_empty() {
        while (auto handle = ready_queue_.try_pop()) {
            handle->resume();
        }
    }

private:
    std::queue<std::coroutine_handle<>> ready_queue_;
};

Step 2.2: Block On Implementation

template<typename T>
T Runtime::block_on(Task<T> task) {
    std::optional<T> result;
    std::exception_ptr exception;
    bool done = false;

    // Create a "root" awaiter that captures completion
    auto root_task = [&]() -> Task<void> {
        try {
            result = co_await std::move(task);
        } catch (...) {
            exception = std::current_exception();
        }
        done = true;
    }();

    // Start the root task
    schedule(root_task.handle_);

    // Run until done
    while (!done) {
        run_one();  // Pop and resume one task
    }

    if (exception) std::rethrow_exception(exception);
    return std::move(*result);
}

Phase 3: Timer Support (Week 3)

Goal: Implement non-blocking sleep().

Step 3.1: Timer Heap

class TimerHeap {
public:
    using TimePoint = std::chrono::steady_clock::time_point;

    void insert(TimePoint wake_time, std::coroutine_handle<> handle) {
        std::lock_guard lock(mutex_);
        heap_.push({wake_time, handle});
        cv_.notify_one();
    }

    std::vector<std::coroutine_handle<>> pop_expired() {
        std::lock_guard lock(mutex_);
        std::vector<std::coroutine_handle<>> expired;
        auto now = std::chrono::steady_clock::now();

        while (!heap_.empty() && heap_.top().wake_time <= now) {
            expired.push_back(heap_.top().handle);
            heap_.pop();
        }
        return expired;
    }

private:
    struct TimerEntry {
        TimePoint wake_time;
        std::coroutine_handle<> handle;

        bool operator>(const TimerEntry& other) const {
            return wake_time > other.wake_time;
        }
    };

    std::priority_queue<TimerEntry,
                       std::vector<TimerEntry>,
                       std::greater<>> heap_;
    std::mutex mutex_;
    std::condition_variable cv_;
};

Step 3.2: Sleep Awaiter

auto sleep(std::chrono::milliseconds duration) {
    return SleepAwaiter{current_runtime(), duration};
}

struct SleepAwaiter {
    Runtime& runtime;
    std::chrono::milliseconds duration;

    bool await_ready() {
        return duration <= std::chrono::milliseconds{0};
    }

    void await_suspend(std::coroutine_handle<> h) {
        auto wake_time = std::chrono::steady_clock::now() + duration;
        runtime.timer_heap_.insert(wake_time, h);
    }

    void await_resume() {}
};

Step 3.3: Timer Thread

void Runtime::timer_thread_func() {
    while (!shutdown_) {
        auto expired = timer_heap_.pop_expired();
        for (auto h : expired) {
            schedule(h);
        }

        // Wait until next timer or new timer added
        timer_heap_.wait_for_next();
    }
}

Phase 4: Multi-Threaded Runtime (Week 4)

Goal: Run tasks across multiple worker threads.

Step 4.1: Thread-Safe Queue

template<typename T>
class ThreadSafeQueue {
public:
    void push(T item) {
        std::lock_guard lock(mutex_);
        queue_.push(std::move(item));
        cv_.notify_one();
    }

    std::optional<T> try_pop() {
        std::lock_guard lock(mutex_);
        if (queue_.empty()) return std::nullopt;
        T item = std::move(queue_.front());
        queue_.pop();
        return item;
    }

    T pop() {  // Blocking
        std::unique_lock lock(mutex_);
        cv_.wait(lock, [this] { return !queue_.empty() || shutdown_; });
        if (shutdown_ && queue_.empty())
            throw std::runtime_error("Queue shutdown");
        T item = std::move(queue_.front());
        queue_.pop();
        return item;
    }

    void shutdown() {
        std::lock_guard lock(mutex_);
        shutdown_ = true;
        cv_.notify_all();
    }

private:
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cv_;
    bool shutdown_ = false;
};

Step 4.2: Worker Threads

Runtime::Runtime(size_t num_threads) : shutdown_(false) {
    for (size_t i = 0; i < num_threads; ++i) {
        workers_.emplace_back([this, i] {
            while (!shutdown_) {
                try {
                    auto handle = ready_queue_.pop();
                    handle.resume();
                } catch (const std::exception& e) {
                    // Queue shutdown or error
                    break;
                }
            }
        });
    }

    // Start timer thread
    timer_thread_ = std::thread([this] { timer_thread_func(); });
}

Runtime::~Runtime() {
    shutdown_ = true;
    ready_queue_.shutdown();
    timer_heap_.shutdown();

    for (auto& worker : workers_) {
        worker.join();
    }
    timer_thread_.join();
}

Phase 5: Combinators (Week 5)

Goal: Implement spawn() and when_all().

Step 5.1: Spawn (Fire and Forget)

template<typename T>
void spawn(Task<T> task) {
    // Task runs independently, we don't wait for it
    // But we need to ensure it gets scheduled
    current_runtime().schedule(task.release_handle());
}

// In Task class:
std::coroutine_handle<promise_type> release_handle() {
    return std::exchange(handle_, {});
}

Warning: Spawned tasks need careful lifetime management. The coroutine frame must stay alive until completion.

Step 5.2: When All (Parallel Composition)

template<typename... Tasks>
Task<std::tuple<typename Tasks::value_type...>> when_all(Tasks... tasks) {
    // This is complex! Simplified approach:
    // 1. Start all tasks
    // 2. Track completion count
    // 3. When all complete, resume the awaiter

    // ... implementation details ...
}

A simpler approach for two tasks:

template<typename T1, typename T2>
Task<std::tuple<T1, T2>> when_all(Task<T1> t1, Task<T2> t2) {
    // Run both concurrently by spawning one
    auto& runtime = current_runtime();

    std::optional<T1> r1;
    std::optional<T2> r2;
    std::exception_ptr e1, e2;
    std::atomic<int> remaining{2};
    std::coroutine_handle<> waiter;

    // Wrapper task for t1
    auto wrapper1 = [&]() -> Task<void> {
        try { r1 = co_await std::move(t1); }
        catch (...) { e1 = std::current_exception(); }
        if (--remaining == 0) runtime.schedule(waiter);
    }();

    // Wrapper task for t2
    auto wrapper2 = [&]() -> Task<void> {
        try { r2 = co_await std::move(t2); }
        catch (...) { e2 = std::current_exception(); }
        if (--remaining == 0) runtime.schedule(waiter);
    }();

    // ... suspend until both complete ...

    co_return std::make_tuple(std::move(*r1), std::move(*r2));
}

Phase 6: Polish and Optimization (Week 6)

Goals:

  1. Handle edge cases (empty when_all, exceptions)
  2. Add thread-local runtime access
  3. Performance tuning
  4. Documentation

Step 6.1: Thread-Local Runtime Access

inline thread_local Runtime* current_runtime_ptr = nullptr;

Runtime& current_runtime() {
    assert(current_runtime_ptr && "No runtime on this thread");
    return *current_runtime_ptr;
}

// In worker thread startup:
current_runtime_ptr = this;

Step 6.2: Exception Handling Completeness

Ensure exceptions propagate correctly through:

  • co_await chains
  • when_all (first exception wins, or aggregate?)
  • block_on (rethrows to caller)

Testing Strategy

Unit Tests

Test Category What to Test
Task basics Create, await, return value
Task chaining A awaits B awaits C
Void tasks Task<void> works correctly
Exceptions Propagate through await chain
Move semantics Tasks are move-only

Integration Tests

// Test: sleep + when_all
Task<void> test_sleep_and_when_all() {
    auto start = std::chrono::steady_clock::now();

    co_await when_all(
        sleep(100ms),
        sleep(100ms),
        sleep(100ms)
    );

    auto elapsed = std::chrono::steady_clock::now() - start;
    // All three sleeps should run concurrently
    assert(elapsed < 150ms);
}

Stress Tests

// Test: many concurrent tasks
void stress_test_concurrent_tasks() {
    Runtime runtime(4);

    auto spawner = []() -> Task<void> {
        std::vector<Task<int>> tasks;
        for (int i = 0; i < 10000; ++i) {
            tasks.push_back([]() -> Task<int> {
                co_await sleep(1ms);
                co_return 42;
            }());
        }

        for (auto& t : tasks) {
            co_await std::move(t);
        }
    };

    runtime.block_on(spawner());
}

Sanitizer Testing

Always compile with:

# Thread Sanitizer
clang++ -std=c++20 -fsanitize=thread -g -O1 ...

# Address Sanitizer (for memory issues)
clang++ -std=c++20 -fsanitize=address -g -O1 ...

Common Pitfalls

Pitfall 1: Dangling Coroutine Handles

Problem: Resuming a destroyed coroutine.

// WRONG: Task destroyed while coroutine is suspended
{
    auto t = some_async_operation();
    // t goes out of scope, destroys coroutine
}
// Later: handle.resume() crashes!

// RIGHT: Keep task alive until completion
runtime.block_on(some_async_operation());

Solution: Ensure Task’s destructor only runs after coroutine completes, or implement reference counting.

Pitfall 2: Deadlock in block_on

Problem: Calling block_on from within a task on the same runtime.

// WRONG: Deadlock!
Task<void> bad_task() {
    // We're on a worker thread, calling block_on blocks this worker
    // If all workers are blocked, no progress is possible
    runtime.block_on(some_other_task());  // DEADLOCK
}

Solution: Never call block_on from inside a task. Use co_await instead.

Pitfall 3: Exception Swallowed in final_suspend

Problem: Exceptions in final_suspend are undefined behavior.

// WRONG: May throw!
auto final_suspend() noexcept {  // noexcept!
    if (something_bad)
        throw std::runtime_error("Oops");  // UB!
}

Solution: final_suspend must be noexcept. Store exceptions in promise, rethrow in await_resume.

Pitfall 4: Use After Move

Problem: Using a Task after moving it.

// WRONG
auto t = make_task();
spawn(std::move(t));
co_await t;  // t is empty now!

Solution: Clear moved-from tasks, assert in operator co_await.

Pitfall 5: Race in when_all Completion

Problem: Two tasks complete simultaneously, both try to resume waiter.

// WRONG: Race condition
if (--remaining == 0) {
    waiter.resume();  // Two threads might both see remaining == 0
}

Solution: Use compare-exchange or ensure only one resumer.

// RIGHT: Atomic compare-exchange
int expected = 1;
if (remaining.compare_exchange_strong(expected, 0)) {
    runtime.schedule(waiter);
}

Pitfall 6: Forgetting to Handle void Tasks

Problem: Task<void> doesn’t have return_value.

// Compilation error with void:
void return_value(T value) { ... }  // T is void!

Solution: Specialize or use return_void().

// In promise_type for Task<void>:
void return_void() {}

Extensions and Challenges

Extension 1: Cancellation

Add cancellation token support:

Task<int> cancellable_operation(CancellationToken token) {
    while (!done) {
        if (token.is_cancelled()) {
            throw OperationCancelledException{};
        }
        co_await do_work();
    }
    co_return result;
}

// Usage
CancellationSource source;
auto task = cancellable_operation(source.token());
// ... later ...
source.cancel();

Extension 2: Timeout

Implement with_timeout:

template<typename T>
Task<std::optional<T>> with_timeout(Task<T> task,
                                    std::chrono::milliseconds timeout) {
    // Returns nullopt if timeout, otherwise the result
}

Extension 3: Select (First Completion)

template<typename... Tasks>
Task<std::variant<typename Tasks::value_type...>> select(Tasks... tasks) {
    // Returns when first task completes
}

Extension 4: Async I/O Integration

Integrate with OS async I/O:

Task<std::vector<char>> async_read_file(std::string path) {
    auto fd = co_await async_open(path, O_RDONLY);
    auto data = co_await async_read(fd, size);
    co_await async_close(fd);
    co_return data;
}

Extension 5: Work Stealing

Add work-stealing for better load balancing:

class WorkStealingRuntime {
    // Each worker has its own deque
    // Workers steal from others when their queue is empty
};

Resources

Primary References

Resource Description
Stanford Coroutine Tutorial David Mazières’ excellent deep-dive
cppcoro Library Production-quality coroutine library by Lewis Baker
Dima Korolev - C++20 Coroutines Practical implementation guide
“C++ Concurrency in Action, 2nd Ed” Anthony Williams - Chapter on coroutines

Code to Study

Project What to Learn
cppcoro Production patterns, task types
folly::coro Facebook’s coroutine library
libunifex Sender/receiver model

Supplemental Reading

Topic Book/Article Chapters
Coroutine theory “C++20: The Complete Guide” by Nicolai Josuttis Coroutines chapter
Async patterns “Concurrency in C#” by Stephen Cleary Parallel composition
Runtime design Tokio documentation Scheduler internals
Lock-free queues “The Art of Multiprocessor Programming” Chapters 10-11

Self-Assessment Checklist

Before moving on, verify you can:

Understanding

  • Explain what await_ready, await_suspend, and await_resume do
  • Describe how co_await task chains two coroutines together
  • Explain why final_suspend must be noexcept
  • Draw the state machine for a Task’s lifecycle
  • Explain how the runtime schedules coroutines

Implementation

  • Task<T> works for both value and void types
  • Exceptions propagate correctly through co_await chains
  • sleep() is non-blocking (other tasks run during sleep)
  • when_all() runs tasks concurrently
  • block_on() works from main thread
  • Multi-threaded runtime has no data races (TSan clean)

Testing

  • Unit tests cover all Task operations
  • Integration tests verify sleep timing
  • Stress tests with thousands of concurrent tasks pass
  • All tests pass under Thread Sanitizer

Extensions (Optional)

  • Cancellation tokens implemented
  • with_timeout combinator works
  • Async file I/O integrated

Submission/Completion Criteria

Your implementation is complete when:

Minimum Requirements

  1. **Task functionality**:
    • Lazy start (doesn’t run until awaited)
    • Proper result storage and retrieval
    • Exception propagation through await chain
    • Move-only semantics
  2. Runtime capabilities:
    • block_on(task) runs to completion
    • schedule(handle) adds to ready queue
    • Multiple worker threads
    • Proper shutdown handling
  3. Primitives:
    • sleep(duration) non-blocking delay
    • when_all(t1, t2) parallel execution
  4. Testing:
    • All tests pass
    • No Thread Sanitizer warnings
    • Performance benchmark included

Deliverables

async_framework/
  include/
    task.hpp           # Task<T> definition
    runtime.hpp        # Runtime class
    primitives.hpp     # sleep, when_all, spawn
    sync.hpp           # Thread-safe queue, timer heap
  src/
    runtime.cpp        # Runtime implementation
  tests/
    task_test.cpp      # Unit tests for Task
    runtime_test.cpp   # Integration tests
    stress_test.cpp    # Concurrent load tests
  examples/
    basic_async.cpp    # Simple usage example
    http_client.cpp    # (Optional) Async HTTP demo
  benchmark/
    throughput.cpp     # Tasks/second measurement
  CMakeLists.txt
  README.md

Grading Rubric (Self-Assessment)

Component Points Criteria
Task correctness 25 Awaiting, returning, exceptions all work
Multi-threaded runtime 25 No races, proper shutdown
Primitives (sleep, when_all) 20 Correct concurrent behavior
Testing coverage 15 Unit, integration, stress tests
Code quality 10 Clean code, good abstractions
Documentation 5 README, inline comments

Total: 100 points


Interview Questions They’ll Ask

Question 1: “Explain how co_await transforms code”

Answer: When the compiler sees co_await expr, it:

  1. Gets an awaiter from expr (via operator co_await or directly)
  2. Calls await_ready() - if true, continues without suspending
  3. If false, calls await_suspend(handle) where handle is the current coroutine
  4. The coroutine is now suspended; control returns to caller/resumer
  5. When resumed, await_resume() is called and its return value becomes the result of the expression

Question 2: “What’s the difference between suspend_always and suspend_never?”

Answer: These are simple awaiter types:

  • suspend_always: await_ready() returns false, always suspends
  • suspend_never: await_ready() returns true, never suspends

They’re used in initial_suspend() and final_suspend() to control coroutine behavior.

Question 3: “How does exception handling work across suspension points?”

Answer:

  1. If a coroutine throws before co_return, unhandled_exception() is called on the promise
  2. The promise stores std::current_exception() in an exception_ptr
  3. When the awaiting coroutine’s await_resume() is called, it checks for stored exception
  4. If present, it calls std::rethrow_exception() to propagate the exception

Question 4: “Why must final_suspend be noexcept?”

Answer: After final_suspend() is called, the coroutine is in a special state - it has completed but the frame hasn’t been destroyed. If final_suspend() threw:

  1. There’s no way to handle the exception (we’re past the coroutine body)
  2. The frame would be in an inconsistent state
  3. The C++ standard says this is undefined behavior

Therefore, all cleanup and exception handling must happen before final_suspend().

Question 5: “Design a system to handle 100,000 concurrent connections”

Answer: Use an async runtime with:

  1. Event loop with epoll/kqueue for I/O multiplexing
  2. Coroutines for each connection - suspend on I/O, resume when ready
  3. Thread pool (N workers, where N ~ CPU cores) to run coroutines
  4. Lock-free queues for work distribution
  5. Timer wheel for timeouts instead of one timer per connection

Key insight: 100k coroutine frames at ~1KB each = 100MB memory, vs 100k threads at ~1MB each = 100GB memory.


Learning Milestones

Milestone 1: Simple Task Completes

Checkpoint: A basic Task<int> can be created, awaited, and returns a value.

What You’ve Proven:

  • You understand promise_type’s role
  • You can implement operator co_await
  • The get_return_object/return_value chain works

Milestone 2: Co_await Chains Work

Checkpoint: Task A awaits Task B, B completes, A resumes and gets the result.

What You’ve Proven:

  • You understand continuation passing
  • The waiting handle mechanism works
  • final_suspend correctly resumes the waiter

Milestone 3: Multi-Threaded Executor Works

Checkpoint: Runtime with 4 threads runs tasks concurrently without races.

What You’ve Proven:

  • Thread-safe queue implementation is correct
  • Tasks can migrate between threads safely
  • Shutdown is clean (no deadlocks)

Milestone 4: When_all Works

Checkpoint: when_all(task1, task2) runs both concurrently and returns both results.

What You’ve Proven:

  • You can coordinate multiple coroutines
  • Atomic completion tracking is correct
  • Complex composition patterns work

Milestone 5: No Deadlocks or Races

Checkpoint: All tests pass under Thread Sanitizer, including stress tests.

What You’ve Proven:

  • Your mental model of the concurrency is correct
  • Edge cases are handled
  • You’ve built a production-quality foundation

Estimated completion time: 4-6 weeks of focused effort

This project is the culmination of your coroutine journey. Take your time - understanding async runtimes deeply will pay dividends throughout your career.