Project 8: Building a Custom Async Runtime (Waker/Executor)
Project 8: Building a Custom Async Runtime (Waker/Executor)
Goal: Master the internals of async Rust by building your own mini-Tokio. You will implement the fundamental primitives that power every async runtime: Tasks, Wakers, and an Executor that efficiently polls futures to completion.
Project Metadata
- Main Programming Language: Rust
- Coolness Level: Level 4: Hardcore Tech Flex
- Difficulty: Level 5: Master
- Knowledge Area: Async Internals / Runtime Implementation
- Time Estimate: 2-3 weeks
- Prerequisites: Solid understanding of Futures, Pin, Arc/Weak, basic concurrency, familiarity with trait objects
Learning Objectives
By completing this project, you will:
-
Understand the Future trait at the lowest level - Know exactly what
poll()does and why it takesPin<&mut Self>and returnsPoll<T> -
Master the Waker/Context mechanism - Build Wakers from scratch using
RawWakerandRawWakerVTable, understanding why type erasure via vtables is necessary -
Implement a working single-threaded Executor - Create the core polling loop that drives futures to completion without busy-waiting
-
Build Arc-based Task management - Understand how tasks are shared between the executor and waker for efficient rescheduling
-
Create async primitives from scratch - Implement a Timer future that demonstrates proper Waker integration
-
Understand the Reactor/Executor split - Know which component handles I/O events vs which runs user code
-
Optimize for CPU efficiency - Learn why naive polling burns CPU and how proper sleeping/waking prevents this
-
Compare runtime architectures - Understand design differences between tokio, async-std, and smol
Deep Theoretical Foundation
The Future Trait: The Heart of Async Rust
The Future trait is deceptively simple, but understanding it deeply is essential:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T),
Pending,
}
Why does poll take Pin<&mut Self>?
Async functions are compiled into state machines. These state machines may contain self-references (pointers to their own fields). If the future were moved after creation, these internal pointers would become dangling. Pin guarantees the future wonât move, making self-references safe.
ASYNC FUNCTION TRANSFORMATION
=============================
Source code:
async fn example() {
let data = String::from("hello");
some_io().await; // <-- Suspension point 1
another_io(&data).await; // <-- Suspension point 2
println!("{}", data);
}
Compiler generates a state machine:
enum ExampleFuture {
Start,
WaitingForSomeIo {
data: String,
some_io_future: SomeIoFuture,
},
WaitingForAnotherIo {
data: String,
another_io_future: AnotherIoFuture<'???>, // Borrows data!
},
Done,
}
The problem: In WaitingForAnotherIo, another_io_future contains
a reference to `data` which is INSIDE THE SAME STRUCT.
+-----------------------------------+
| WaitingForAnotherIo |
| +-------------------------------+ |
| | data: String |<--+
| | ptr: 0x5000 (heap) | |
| | len: 5 | |
| +-------------------------------+ |
| +-------------------------------+ |
| | another_io_future | |
| | data_ref: &String --------->+ (self-reference!)
| | state: ... |
| +-------------------------------+ |
+-----------------------------------+
If this struct is MOVED, data_ref becomes a dangling pointer!
Pin<&mut Self> prevents this move.
How async/await Desugars into Poll-Based State Machines
When you write async code, the compiler transforms it into a state machine that implements Future. Each .await point becomes a state transition:
// Source code
async fn two_steps() -> String {
let a = step_one().await; // State 0 -> 1
let b = step_two(a).await; // State 1 -> 2
format!("{}{}", a, b) // State 2 -> Done
}
STATE MACHINE DIAGRAM
=====================
+----------+ step_one +----------+
| State 0 | returns Ready(a) | State 1 |
| (Start) | -----------------------> | (Got 'a')|
+----------+ +----------+
| |
| step_one returns Pending | step_two returns Ready(b)
v v
+----------+ +----------+
| Return | | State 2 |
| Pending | | (Got 'b')|
+----------+ +----------+
|
v
+----------+
| Done |
| Ready() |
+----------+
Each poll() call:
1. Checks current state
2. Polls the inner future for that state
3. If Ready: transitions to next state
4. If Pending: stores progress, returns Pending
5. If Done: returns Ready(final_value)
The poll() Function Signature Deep Dive
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>
Letâs break down each component:
self: Pin<&mut Self>
- Exclusive mutable access to the future
- Cannot be moved (Pin guarantee)
- Allows modifying internal state
cx: &mut Context<'_>
- Contains the Waker for this task
- The future uses this to register for wake-ups
- Weâll build this from scratch!
Poll<Self::Output>
Poll::Ready(value)- Future completed with valuePoll::Pending- Future not ready, will wake later
The Reactor/Executor Split Architecture
Modern async runtimes separate two concerns:
REACTOR/EXECUTOR ARCHITECTURE
=============================
+-------------------------------------------------------------------+
| USER CODE |
| async fn my_app() { |
| let data = fetch_url("...").await; |
| process(data).await; |
| } |
+-------------------------------------------------------------------+
|
| Spawned as Task
v
+-------------------------------------------------------------------+
| EXECUTOR |
| |
| Responsibilities: |
| - Maintains queue of ready tasks |
| - Polls futures when they're ready |
| - Creates Wakers for each task |
| - Manages task lifecycle (spawn/complete/cancel) |
| |
| +------------------------+ |
| | Ready Queue | Tasks waiting to be polled |
| | [Task A, Task C, ...] | |
| +------------------------+ |
| |
+-------------------------------------------------------------------+
^ |
| Waker::wake() | Register interest
| (reschedule task) | (epoll/kqueue/IOCP)
| v
+-------------------------------------------------------------------+
| REACTOR |
| |
| Responsibilities: |
| - Monitors I/O resources (sockets, files, timers) |
| - Uses OS primitives (epoll on Linux, kqueue on macOS) |
| - Wakes tasks when their I/O is ready |
| |
| +------------------------+ |
| | Registered Resources | |
| | socket 5 -> Task A | |
| | timer 12 -> Task B | |
| | socket 8 -> Task C | |
| +------------------------+ |
| |
+-------------------------------------------------------------------+
^
|
OS Kernel Events
(readable, writable, timer fired)
Executor responsibilities:
- Store spawned tasks
- Maintain a âready queueâ of tasks to poll
- Poll ready tasks, driving them toward completion
- Handle Waker creation and task rescheduling
Reactor responsibilities:
- Register I/O resources with the OS
- Wait for OS events (via epoll/kqueue/IOCP)
- Wake the appropriate tasks when events occur
- Manage timer registrations
In simpler runtimes (like what weâll build), these may be combined. In production runtimes like Tokio, theyâre separate for better performance and modularity.
Waker Internals: RawWaker, RawWakerVTable, Context
The Waker is how a future tells the executor âIâm ready to make progress.â Letâs understand the types:
// From std::task
pub struct Waker {
waker: RawWaker,
}
pub struct RawWaker {
data: *const (), // Pointer to our task data
vtable: &'static RawWakerVTable, // Function pointers
}
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
wake_by_ref: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
}
pub struct Context<'a> {
waker: &'a Waker,
// ... other fields in newer Rust
}
Why this complex design with vtables?
Type erasure. The Waker needs to work with any task type from any executor, but it must have a fixed size (two pointers). The vtable provides dynamic dispatch without generics:
WAKER TYPE ERASURE VIA VTABLE
=============================
The Problem:
------------
Executors have different Task types:
- Tokio: tokio::task::Task<F>
- async-std: async_std::task::Task
- smol: smol::Task<T>
- Your executor: MyTask
But std::task::Waker must work with ALL of them!
It can't be generic (Waker<T>) because futures need a concrete type.
The Solution: VTable (Virtual Table)
------------------------------------
+------------------+ +-----------------------+
| RawWaker | | RawWakerVTable |
| +------------+ | | (static, lives forever)|
| | data: *() +---+--+ +-----------------------+
| +------------+ | | | clone: fn(*const ()) |
| | vtable ----+---+--+-->| wake: fn(*const ()) |
| +------------+ | | wake_by_ref: fn(...) |
+------------------+ | drop: fn(*const ()) |
+-----------------------+
|
| data points to
v
+------------------+
| Arc<MyTask> | (Executor-specific task type)
| +------------+ |
| | future | |
| | state | |
| | queue_ref | |
| +------------+ |
+------------------+
When waker.wake() is called:
1. Get the vtable from the RawWaker
2. Call vtable.wake(data)
3. The wake function knows data is Arc<MyTask>
4. Cast it back and reschedule the task!
This is the same pattern as dyn Trait, but manual!
Why Wakers Use VTables (Type Erasure Explained)
The Waker uses vtables for the same reason dyn Trait does: we need polymorphism without generics.
// If Waker were generic (it's NOT):
struct GenericWaker<T: Task> {
task: Arc<T>,
}
// Then Future::poll would need to be generic:
fn poll<T: Task>(self: Pin<&mut Self>, cx: &mut Context<T>) -> Poll<...>
// ^^^
// This would make Future impossible to use as a trait object!
// Instead, Waker uses type erasure:
struct Waker {
raw: RawWaker, // Fixed size: two pointers
}
// - First pointer: the data (our Arc<Task>)
// - Second pointer: vtable with functions that know the real type
The vtable pattern allows each executor to implement its own wake behavior while presenting a uniform interface to all futures.
Arc-Based Task Management Patterns
Tasks need to be shared between multiple owners:
- The executor (to poll the task)
- The Waker (to reschedule the task)
- The ready queue (to know what to poll next)
Arc (Atomic Reference Counting) provides thread-safe shared ownership:
ARC-BASED TASK SHARING
======================
+-------------------+
| Arc<Task> |
| +--------------+ |
| | strong: 3 | | (3 owners)
| | weak: 0 | |
| +--------------+ |
| | Future | |
| | State | |
| | Scheduler ref| |
| +--------------+ |
+-------------------+
^ ^ ^
| | |
+------------+ | +------------+
| | |
+---------+-----+ +------+------+ +-----+---------+
| Ready Queue | | Waker | | Executor |
| [Arc<Task>] | | (holds Arc) | | (task map) |
+--------------+ +-------------+ +---------------+
When wake() is called:
1. Waker holds Arc<Task>
2. It clones the Arc (atomic increment of refcount)
3. Pushes the cloned Arc to the ready queue
4. Executor pops from queue and polls
When task completes:
1. Remove from executor's task map
2. Drop from ready queue
3. Waker dropped
4. Refcount hits 0, task deallocated
The Poll Loop: When to Wake, When to Sleep
The executorâs main loop is critical for efficiency:
// NAIVE (BAD) executor loop - burns CPU!
loop {
for task in &tasks {
task.poll(); // Poll everything constantly
}
}
// SMART executor loop - sleeps when idle
loop {
// Only poll tasks that are READY
while let Some(task) = ready_queue.pop() {
task.poll(&waker);
}
// No ready tasks? Sleep until a Waker wakes us
if ready_queue.is_empty() {
wait_for_event(); // Block until wake() called
}
}
EXECUTOR MAIN LOOP
==================
+-------------+
| Start |
+------+------+
|
v
+------+------+
| Ready queue |
| empty? |
+------+------+
|
+-------+-------+
| No | Yes
v v
+------+------+ +-----+------+
| Pop task | | Sleep/ |
| from queue | | Park thread|
+------+------+ +-----+------+
| ^
v | Waker calls wake()
+------+------+ | (unparks thread)
| Poll task | |
+------+------+ |
| |
v |
+------+------+ |
| Poll:: | |
| Pending? | |
+------+------+ |
| |
+---+---+ |
| | |
v v |
Yes No |
| | |
| +---+---+ |
| | Remove | |
| | task | |
| +---+---+ |
| | |
+---+---+ |
| |
v |
+------+------+ |
| Continue +--------+
| loop |
+-------------+
Key insight: The executor ONLY polls tasks that have been woken.
Tasks that return Pending are NOT polled again until their
Waker's wake() method is called.
Comparison: OS Threads vs Green Threads vs Async
Understanding the tradeoffs helps you appreciate asyncâs design:
CONCURRENCY MODEL COMPARISON
============================
1. OS THREADS (std::thread)
---------------------------
+------------+ +------------+ +------------+
| Thread 1 | | Thread 2 | | Thread 3 |
| Stack: 2MB | | Stack: 2MB | | Stack: 2MB |
| Registers | | Registers | | Registers |
+------------+ +------------+ +------------+
| | |
+-------+-------+-------+-------+
|
+-----+-----+
| OS Kernel |
| (scheduler)|
+-----------+
Pros: True parallelism, simple mental model
Cons: Heavy (MB per thread), expensive context switch
Limits: ~10K threads before system struggles
Use when: CPU-bound work, need parallelism
2. GREEN THREADS (Go goroutines, old Rust)
------------------------------------------
+--------+ +--------+ +--------+
| Green 1| | Green 2| | Green 3|
| 4KB | | 4KB | | 4KB | (growable stacks)
+--------+ +--------+ +--------+
| | |
+-----+----------+----------+-----+
| Runtime Scheduler |
| (M:N mapping to OS threads) |
+---------------------------------+
|
+-----+-----+
| OS Thread |
| Pool |
+-----------+
Pros: Lighter than OS threads, can have millions
Cons: Stack can grow unpredictably, GC interaction
Limits: Memory limited (4KB min each)
Use when: Many concurrent I/O tasks
3. ASYNC/AWAIT (Rust, JavaScript)
---------------------------------
+--------+ +--------+ +--------+
| Future1| | Future2| | Future3|
| ~100B | | ~200B | | ~50B | (known size at compile time)
+--------+ +--------+ +--------+
| | |
+-----+----------+----------+-----+
| Executor |
| (single or multi-threaded) |
+---------------------------------+
|
+-----+-----+
| 1-N OS |
| Threads |
+-----------+
Pros: Minimal memory per task, no hidden allocations
Cons: Everything must be async, colored functions
Limits: Millions of tasks easily
Use when: High-concurrency I/O-bound work
MEMORY COMPARISON (10,000 concurrent tasks):
--------------------------------------------
OS Threads: 10,000 * 2MB = 20 GB
Green Threads: 10,000 * 4KB = 40 MB
Async Futures: 10,000 * 200B = 2 MB (!)
Real-World Runtime Designs: Tokio, async-std, smol
Letâs understand how production runtimes differ:
TOKIO ARCHITECTURE
==================
+--------------------------------------------------+
| Tokio |
| +----------------------------------------------+ |
| | Runtime Handle | |
| +----------------------------------------------+ |
| | |
| +----------------+----------------+ |
| | | | |
| +--+--+ +--+--+ +--+--+ |
| |Worker| |Worker| |Worker| | (thread pool)
| |Thread| |Thread| |Thread| |
| +--+--+ +--+--+ +--+--+ |
| | | | |
| +--+----------------+----------------+--+ |
| | Work-Stealing Scheduler | |
| +---------------------------------------+ |
| | |
| +--+--+ +--+--+ +--+--+ +--+--+ |
| |Queue| |Queue| |Queue| |Inject| |
| |(per | |(per | |(per | |Queue | |
| |worker) |worker) |worker) |(global)| |
| +-----+ +-----+ +-----+ +------+ |
| | |
| +---------------------------------------+ |
| | I/O Driver | |
| | (epoll/kqueue/IOCP integration) | |
| +---------------------------------------+ |
+--------------------------------------------------+
Key features:
- Work-stealing for load balancing
- Per-worker local queues (cache friendly)
- Global inject queue for spawning
- Integrated I/O driver
ASYNC-STD ARCHITECTURE
======================
- Similar to Tokio but different API
- Uses async-executor crate internally
- Aims for familiar std-like API
SMOL ARCHITECTURE
=================
- Minimalist design
- async-executor for task scheduling
- async-io for I/O integration
- Composed from smaller crates
- ~1000 lines of code vs Tokio's ~50,000
How Hardware Interrupts Connect to the Waker System
Understanding the full stack from hardware to your future:
FROM HARDWARE INTERRUPT TO FUTURE POLLING
==========================================
1. Network packet arrives at NIC
+-------------+
| Network Card|
+------+------+
|
v Hardware Interrupt
+------+------+
| CPU/Kernel | Interrupt handler runs
+------+------+
|
v Packet copied to kernel buffer
+------+------+
| Socket | Socket marked readable
| Buffer |
+------+------+
|
v epoll_wait() returns
+------+------+
| epoll/kqueue| "Socket 5 is readable!"
+------+------+
|
v Reactor notified
+------+------+
| Reactor | Looks up: Socket 5 -> Task 12
+------+------+
|
v waker.wake()
+------+------+
| Waker | Schedules Task 12
+------+------+
|
v Task added to ready queue
+------+------+
| Executor | Pops task, calls poll()
+------+------+
|
v Future::poll() called
+------+------+
| Your Future | Reads data from socket
+------+------+
|
v Returns Poll::Ready(data)
+------+------+
| Your Code | Continues with data!
+------+------+
Total latency: < 1 microsecond (typical)
Single-Threaded vs Multi-Threaded Executors
SINGLE-THREADED EXECUTOR
========================
+-------------------------------------+
| OS Thread |
| +---------------------------+ |
| | Executor | |
| | +---------+ +---------+ | |
| | | Task 1 | | Task 2 | | |
| | +---------+ +---------+ | |
| | +---------+ +---------+ | |
| | | Task 3 | | Task 4 | | |
| | +---------+ +---------+ | |
| +---------------------------+ |
+-------------------------------------+
Pros:
- No synchronization needed (no Arc, no Mutex)
- No Send/Sync bounds required on futures
- Predictable performance (no lock contention)
- Simpler to reason about
Cons:
- Cannot utilize multiple CPU cores
- Blocking one task blocks everything
- Limited by single core performance
Use for:
- Single-threaded WASM
- Simple CLI tools
- When Send isn't available
MULTI-THREADED EXECUTOR
=======================
+----------------+ +----------------+ +----------------+
| OS Thread 1 | | OS Thread 2 | | OS Thread 3 |
| +------------+ | | +------------+ | | +------------+ |
| | Worker | | | | Worker | | | | Worker | |
| | +--------+ | | | | +--------+ | | | | +--------+ | |
| | | Task 1 | | | | | | Task 4 | | | | | | Task 6 | | |
| | +--------+ | | | | +--------+ | | | | +--------+ | |
| | | Task 2 | | | | | | Task 5 | | | | | | Task 7 | | |
| | +--------+ | | | | +--------+ | | | | +--------+ | |
| | | Task 3 | | | | +------------+ | | +------------+ |
| | +--------+ | | +----------------+ +----------------+
| +------------+ |
+----------------+
| | |
+-------------------+-------------------+
|
+--------+--------+
| Shared State |
| (atomic queues) |
+-----------------+
Pros:
- Utilizes all CPU cores
- One blocked task doesn't stop others
- Better for CPU-intensive async work
Cons:
- Futures must be Send
- Need atomic operations for shared state
- Work-stealing adds complexity
- Cache contention possible
Use for:
- Web servers
- Data processing pipelines
- Anything I/O-bound at scale
Real World Outcome
Youâll be able to run async code without tokio or async-std. Youâll understand exactly how .await suspends a function and how timers trigger a âwake upâ.
Example Usage:
let mut executor = MyExecutor::new();
executor.spawn(async {
println!("Step 1: Starting");
Timer::after(Duration::from_secs(1)).await;
println!("Step 2: After 1 second");
Timer::after(Duration::from_millis(500)).await;
println!("Step 3: After another 500ms");
});
executor.spawn(async {
println!("Task 2: Hello from concurrent task!");
Timer::after(Duration::from_millis(250)).await;
println!("Task 2: Goodbye!");
});
executor.run(); // Blocks until all tasks complete
Expected Output:
$ cargo run --example demo
Compiling my-executor v0.1.0
Finished dev [unoptimized + debuginfo] target(s) in 2.31s
Running `target/debug/examples/demo`
=== Custom Async Runtime Demo ===
[Executor] Starting run loop
[Executor] Ready queue has 2 tasks
Step 1: Starting
Task 2: Hello from concurrent task!
[Executor] Task 0 returned Pending, registered waker
[Executor] Task 1 returned Pending, registered waker
[Executor] Ready queue empty, sleeping...
[Timer] 250ms timer fired, waking task 1
[Executor] Woken by timer, checking ready queue
Task 2: Goodbye!
[Executor] Task 1 completed!
[Timer] 500ms remaining timer fired, waking task 0
Step 2: After 1 second
[Executor] Task 0 returned Pending, registered waker
[Timer] 500ms timer fired, waking task 0
Step 3: After another 500ms
[Executor] Task 0 completed!
[Executor] All tasks complete, exiting run loop
=== Performance Metrics ===
Total runtime: 1.502s
Tasks executed: 2
Poll calls: 6
Wakeups: 4
CPU idle time: 1.499s (99.8%) <-- Not busy-waiting!
Peak memory: 4.2 KB
$ cargo run --example benchmark
Running `target/debug/examples/benchmark`
Spawning 10,000 timer tasks...
All tasks spawned in 3.2ms
Waiting for completion...
All 10,000 tasks completed in 1.021s
Memory usage: 2.1 MB
Average latency: 0.1ms per task wake
Complete Project Specification
Build a fully functional async executor with these components:
Core Requirements
- Task struct - Wraps a future with its execution state
- Waker implementation - Using RawWaker and RawWakerVTable
- Executor - The main run loop that polls tasks
- Ready queue - Thread-safe queue of tasks to poll
- Timer future - Demonstrates async/await integration
Stretch Goals
- Multi-threaded executor - Work-stealing scheduler
- I/O reactor - Integration with epoll/kqueue
- JoinHandle - Await task completion from other tasks
Solution Architecture
Task Struct Design
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
/// A spawned task containing a boxed future
struct Task {
/// The future to poll (boxed for type erasure)
future: Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send>>>>,
/// Reference to the executor's ready queue
/// Used by the Waker to reschedule this task
task_sender: crossbeam_channel::Sender<Arc<Task>>,
}
impl Task {
/// Create a new task wrapping a future
fn new<F>(future: F, task_sender: crossbeam_channel::Sender<Arc<Task>>) -> Arc<Task>
where
F: Future<Output = ()> + Send + 'static,
{
Arc::new(Task {
future: Mutex::new(Some(Box::pin(future))),
task_sender,
})
}
/// Poll the task once
fn poll(self: Arc<Self>) -> Poll<()> {
// Take the future out temporarily
let mut future_slot = self.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// Create a Waker that will reschedule this task
let waker = self.clone().into_waker();
let mut cx = Context::from_waker(&waker);
match future.as_mut().poll(&mut cx) {
Poll::Ready(()) => {
// Task complete, don't put future back
Poll::Ready(())
}
Poll::Pending => {
// Put the future back for next poll
*future_slot = Some(future);
Poll::Pending
}
}
} else {
// Future already completed or taken
Poll::Ready(())
}
}
}
Waker with RawWakerVTable
use std::task::{RawWaker, RawWakerVTable, Waker};
impl Task {
/// Convert this task into a Waker
fn into_waker(self: Arc<Self>) -> Waker {
let raw = Self::into_raw_waker(self);
unsafe { Waker::from_raw(raw) }
}
fn into_raw_waker(this: Arc<Self>) -> RawWaker {
// Convert Arc<Task> to raw pointer
let ptr = Arc::into_raw(this) as *const ();
RawWaker::new(ptr, &VTABLE)
}
}
/// The vtable for our Waker implementation
static VTABLE: RawWakerVTable = RawWakerVTable::new(
clone_waker, // Clone
wake, // Wake (by value)
wake_by_ref, // Wake (by reference)
drop_waker, // Drop
);
/// Clone the waker - increment Arc refcount
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
let arc = Arc::from_raw(ptr as *const Task);
let cloned = Arc::clone(&arc);
std::mem::forget(arc); // Don't decrement original's refcount
Task::into_raw_waker(cloned)
}
/// Wake the task - schedule it for polling
unsafe fn wake(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const Task);
wake_task(&arc);
// arc is dropped here, decrementing refcount
}
/// Wake by reference - don't consume the waker
unsafe fn wake_by_ref(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const Task);
wake_task(&arc);
std::mem::forget(arc); // Don't decrement refcount
}
/// Drop the waker - decrement Arc refcount
unsafe fn drop_waker(ptr: *const ()) {
let _ = Arc::from_raw(ptr as *const Task);
// arc is dropped here
}
/// The actual wake logic - reschedule the task
fn wake_task(task: &Arc<Task>) {
// Send task to the ready queue
let _ = task.task_sender.send(Arc::clone(task));
}
WAKER VTABLE VISUALIZATION
==========================
When we create a RawWaker:
Arc<Task> RawWaker
+----------+ +-------------+
| refcount | | data -------+---> points to Arc<Task>
| Task { | | vtable -----+---> &VTABLE
| future | +-------------+
| sender |
| } |
+----------+
static VTABLE:
+-----------------------+
| clone: clone_waker | -> Clones the Arc, returns new RawWaker
| wake: wake | -> Sends task to ready queue, drops Arc
| wake_ref: wake_by_ref | -> Sends task to ready queue, keeps Arc
| drop: drop_waker | -> Drops the Arc
+-----------------------+
When waker.wake() is called:
1. Rust calls VTABLE.wake(data)
2. We reconstruct Arc<Task> from the raw pointer
3. We clone it and send to the ready queue
4. The Arc is dropped, decrementing refcount
Ready Queue Data Structure
use crossbeam_channel::{unbounded, Receiver, Sender};
use std::sync::atomic::{AtomicBool, Ordering};
/// A simple ready queue using crossbeam channels
struct ReadyQueue {
sender: Sender<Arc<Task>>,
receiver: Receiver<Arc<Task>>,
}
impl ReadyQueue {
fn new() -> Self {
let (sender, receiver) = unbounded();
ReadyQueue { sender, receiver }
}
/// Push a task to the queue
fn push(&self, task: Arc<Task>) {
let _ = self.sender.send(task);
}
/// Pop a task from the queue (blocking)
fn pop(&self) -> Option<Arc<Task>> {
self.receiver.recv().ok()
}
/// Try to pop without blocking
fn try_pop(&self) -> Option<Arc<Task>> {
self.receiver.try_recv().ok()
}
/// Check if empty
fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
}
Executor Main Loop
/// The async executor
pub struct Executor {
ready_queue: ReadyQueue,
task_sender: Sender<Arc<Task>>,
}
impl Executor {
pub fn new() -> Self {
let ready_queue = ReadyQueue::new();
let task_sender = ready_queue.sender.clone();
Executor {
ready_queue,
task_sender,
}
}
/// Spawn a new task
pub fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Task::new(future, self.task_sender.clone());
self.ready_queue.push(task);
}
/// Run all tasks to completion
pub fn run(&self) {
loop {
// Poll all ready tasks
while let Some(task) = self.ready_queue.try_pop() {
match task.poll() {
Poll::Ready(()) => {
// Task complete, don't reschedule
}
Poll::Pending => {
// Task will be rescheduled by its Waker
// We don't put it back in the queue here
}
}
}
// All tasks pending, try to get one more (blocking)
match self.ready_queue.receiver.try_recv() {
Ok(task) => {
// Got a task, poll it
if let Poll::Ready(()) = task.poll() {
// Task complete
}
}
Err(crossbeam_channel::TryRecvError::Empty) => {
// Check if we should exit
// (In a real executor, we'd track active task count)
break;
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
break;
}
}
}
}
}
EXECUTOR MAIN LOOP PSEUDOCODE
=============================
fn run():
active_tasks = count of spawned tasks
loop:
# Phase 1: Poll all ready tasks
while task = ready_queue.try_pop():
result = task.poll()
if result == Ready:
active_tasks -= 1
if active_tasks == 0:
return # All done!
# If Pending, task's Waker will reschedule it
# Phase 2: Wait for more work
if active_tasks > 0:
task = ready_queue.pop() # Blocking wait
if task:
# Poll the task we just received
result = task.poll()
if result == Ready:
active_tasks -= 1
else:
return # No more tasks
Timer Wheel for Efficient Scheduling
For efficient timer management, we use a timer wheel:
TIMER WHEEL STRUCTURE
=====================
A timer wheel is like a clock with slots for each time unit.
Example: 8-slot wheel with 100ms per slot (800ms total)
Current time: 250ms
Current slot: 2 (250 / 100 = 2)
Slot 0 (0-99ms): []
Slot 1 (100-199ms): []
Slot 2 (200-299ms): [Waker A] <-- Current slot
Slot 3 (300-399ms): [Waker B, Waker C]
Slot 4 (400-499ms): []
Slot 5 (500-599ms): [Waker D]
Slot 6 (600-699ms): []
Slot 7 (700-799ms): [Waker E]
When we advance to 300ms (slot 3):
1. Fire all wakers in slot 2 (Waker A)
2. Move to slot 3
Benefits:
- O(1) insertion (just add to slot)
- O(1) expiration (process current slot)
- Good for many timers at similar times
For timers beyond one rotation, use hierarchical wheels:
- Wheel 1: milliseconds (8 slots x 100ms)
- Wheel 2: seconds (60 slots x 1s)
- Wheel 3: minutes (60 slots x 1min)
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};
/// A timer entry in the wheel
struct TimerEntry {
deadline: Instant,
waker: Waker,
}
impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Reverse ordering so earliest deadline is popped first
other.deadline.cmp(&self.deadline)
}
}
impl PartialOrd for TimerEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for TimerEntry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline
}
}
impl Eq for TimerEntry {}
/// Simple timer wheel (using BinaryHeap for this example)
pub struct TimerWheel {
timers: Mutex<BinaryHeap<TimerEntry>>,
}
impl TimerWheel {
pub fn new() -> Self {
TimerWheel {
timers: Mutex::new(BinaryHeap::new()),
}
}
/// Register a timer
pub fn register(&self, deadline: Instant, waker: Waker) {
let mut timers = self.timers.lock().unwrap();
timers.push(TimerEntry { deadline, waker });
}
/// Check and fire expired timers
pub fn check_timers(&self) {
let now = Instant::now();
let mut timers = self.timers.lock().unwrap();
while let Some(entry) = timers.peek() {
if entry.deadline <= now {
let entry = timers.pop().unwrap();
entry.waker.wake();
} else {
break;
}
}
}
/// Get duration until next timer
pub fn next_deadline(&self) -> Option<Duration> {
let timers = self.timers.lock().unwrap();
timers.peek().map(|e| {
let now = Instant::now();
if e.deadline > now {
e.deadline - now
} else {
Duration::ZERO
}
})
}
}
Phased Implementation Guide
Phase 1: Implement Future for a Simple Type
Goal: Understand Future::poll at the most basic level.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A future that immediately returns a value
struct Ready<T> {
value: Option<T>,
}
impl<T> Ready<T> {
fn new(value: T) -> Self {
Ready { value: Some(value) }
}
}
impl<T> Future for Ready<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
match self.value.take() {
Some(v) => Poll::Ready(v),
None => panic!("Future polled after completion"),
}
}
}
/// A future that returns Pending once, then Ready
struct YieldNow {
yielded: bool,
}
impl YieldNow {
fn new() -> Self {
YieldNow { yielded: false }
}
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
// IMPORTANT: We must call wake to be polled again!
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
Test your implementation:
#[test]
fn test_ready_future() {
let future = Ready::new(42);
// Create a dummy waker
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
// Poll the future
let mut pinned = Box::pin(future);
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => assert_eq!(v, 42),
Poll::Pending => panic!("Should be ready"),
}
}
Phase 2: Build the Waker with RawWakerVTable
Goal: Implement a working Waker using the raw vtable API.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{RawWaker, RawWakerVTable, Waker};
/// A simple waker that counts how many times it was woken
static WAKE_COUNT: AtomicUsize = AtomicUsize::new(0);
fn counting_waker() -> Waker {
static VTABLE: RawWakerVTable = RawWakerVTable::new(
|ptr| RawWaker::new(ptr, &VTABLE), // clone
|_ptr| WAKE_COUNT.fetch_add(1, Ordering::SeqCst), // wake
|_ptr| WAKE_COUNT.fetch_add(1, Ordering::SeqCst), // wake_by_ref
|_ptr| {}, // drop
);
let raw = RawWaker::new(std::ptr::null(), &VTABLE);
unsafe { Waker::from_raw(raw) }
}
#[test]
fn test_counting_waker() {
WAKE_COUNT.store(0, Ordering::SeqCst);
let waker = counting_waker();
assert_eq!(WAKE_COUNT.load(Ordering::SeqCst), 0);
waker.wake_by_ref();
assert_eq!(WAKE_COUNT.load(Ordering::SeqCst), 1);
waker.wake(); // Consumes waker
assert_eq!(WAKE_COUNT.load(Ordering::SeqCst), 2);
}
Now implement a waker that actually reschedules a task:
use std::sync::Arc;
use crossbeam_channel::Sender;
struct TaskWaker {
task_id: usize,
sender: Sender<usize>,
}
impl TaskWaker {
fn into_waker(self: Arc<Self>) -> Waker {
let ptr = Arc::into_raw(self) as *const ();
let raw = RawWaker::new(ptr, &TASK_WAKER_VTABLE);
unsafe { Waker::from_raw(raw) }
}
}
static TASK_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
// clone
|ptr| {
let arc = unsafe { Arc::from_raw(ptr as *const TaskWaker) };
let cloned = Arc::clone(&arc);
std::mem::forget(arc);
let new_ptr = Arc::into_raw(cloned) as *const ();
RawWaker::new(new_ptr, &TASK_WAKER_VTABLE)
},
// wake
|ptr| {
let arc = unsafe { Arc::from_raw(ptr as *const TaskWaker) };
let _ = arc.sender.send(arc.task_id);
},
// wake_by_ref
|ptr| {
let arc = unsafe { Arc::from_raw(ptr as *const TaskWaker) };
let _ = arc.sender.send(arc.task_id);
std::mem::forget(arc);
},
// drop
|ptr| {
let _ = unsafe { Arc::from_raw(ptr as *const TaskWaker) };
},
);
Phase 3: Create the Executor Run Loop
Goal: Build the main loop that polls tasks.
use std::collections::HashMap;
pub struct SimpleExecutor {
tasks: HashMap<usize, Pin<Box<dyn Future<Output = ()> + Send>>>,
ready_queue: (Sender<usize>, Receiver<usize>),
next_id: usize,
}
impl SimpleExecutor {
pub fn new() -> Self {
let (sender, receiver) = crossbeam_channel::unbounded();
SimpleExecutor {
tasks: HashMap::new(),
ready_queue: (sender, receiver),
next_id: 0,
}
}
pub fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
let id = self.next_id;
self.next_id += 1;
self.tasks.insert(id, Box::pin(future));
self.ready_queue.0.send(id).unwrap();
}
pub fn run(&mut self) {
while !self.tasks.is_empty() {
// Wait for a task to be ready
let task_id = match self.ready_queue.1.recv() {
Ok(id) => id,
Err(_) => break,
};
// Get the task
if let Some(mut future) = self.tasks.remove(&task_id) {
// Create waker for this task
let waker_data = Arc::new(TaskWaker {
task_id,
sender: self.ready_queue.0.clone(),
});
let waker = waker_data.into_waker();
let mut cx = Context::from_waker(&waker);
// Poll the task
match future.as_mut().poll(&mut cx) {
Poll::Ready(()) => {
// Task complete, don't put back
println!("Task {} completed", task_id);
}
Poll::Pending => {
// Put task back, will be woken later
self.tasks.insert(task_id, future);
}
}
}
}
}
}
Phase 4: Add a Task Queue with Arc
Goal: Proper Arc-based task management.
use std::sync::Mutex;
/// A proper task that can be shared via Arc
pub struct Task {
id: usize,
future: Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send>>>>,
sender: Sender<Arc<Task>>,
}
impl Task {
pub fn new<F>(id: usize, future: F, sender: Sender<Arc<Task>>) -> Arc<Self>
where
F: Future<Output = ()> + Send + 'static,
{
Arc::new(Task {
id,
future: Mutex::new(Some(Box::pin(future))),
sender,
})
}
/// Poll this task
pub fn poll(self: &Arc<Self>) -> Poll<()> {
let waker = self.clone().into_waker();
let mut cx = Context::from_waker(&waker);
let mut guard = self.future.lock().unwrap();
if let Some(ref mut future) = *guard {
future.as_mut().poll(&mut cx)
} else {
Poll::Ready(())
}
}
/// Mark task as complete
pub fn complete(&self) {
*self.future.lock().unwrap() = None;
}
/// Convert to Waker
fn into_waker(self: Arc<Self>) -> Waker {
// ... vtable implementation from Phase 2
}
}
Phase 5: Implement a Timer Future
Goal: Build a timer that demonstrates async/await.
use std::time::{Duration, Instant};
use std::sync::Mutex;
/// Global timer registry
lazy_static! {
static ref TIMER_REGISTRY: TimerRegistry = TimerRegistry::new();
}
struct TimerRegistry {
timers: Mutex<Vec<(Instant, Waker)>>,
}
impl TimerRegistry {
fn new() -> Self {
TimerRegistry {
timers: Mutex::new(Vec::new()),
}
}
fn register(&self, deadline: Instant, waker: Waker) {
let mut timers = self.timers.lock().unwrap();
timers.push((deadline, waker));
}
fn check(&self) {
let now = Instant::now();
let mut timers = self.timers.lock().unwrap();
// Partition into expired and not expired
let (expired, remaining): (Vec<_>, Vec<_>) = timers
.drain(..)
.partition(|(deadline, _)| *deadline <= now);
*timers = remaining;
// Wake expired timers
for (_, waker) in expired {
waker.wake();
}
}
}
/// A timer future
pub struct Timer {
deadline: Instant,
registered: bool,
}
impl Timer {
pub fn after(duration: Duration) -> Self {
Timer {
deadline: Instant::now() + duration,
registered: false,
}
}
}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.deadline {
Poll::Ready(())
} else {
if !self.registered {
TIMER_REGISTRY.register(self.deadline, cx.waker().clone());
self.registered = true;
}
Poll::Pending
}
}
}
Phase 6: Multi-Threaded Work-Stealing (Advanced)
Goal: Scale to multiple CPU cores.
WORK-STEALING EXECUTOR DESIGN
=============================
Thread 1 Thread 2 Thread 3
+--------+ +--------+ +--------+
|Worker 1| |Worker 2| |Worker 3|
+--------+ +--------+ +--------+
| | |
+---+----+ +---+----+ +---+----+
| Local | | Local | | Local |
| Queue |<--steal----| Queue |<--steal----| Queue |
| [A,B,C]| | [D,E] | | [] |
+--------+ +--------+ +--------+
|
| steal!
v
[C] stolen
Work-stealing algorithm:
1. Pop from own local queue (fast, no contention)
2. If empty, try to steal from other workers
3. If all empty, check global inject queue
4. If still empty, park thread and wait
Benefits:
- Load balancing happens automatically
- Cache-friendly (usually work on own tasks)
- Scales well to many cores
use crossbeam_deque::{Injector, Stealer, Worker};
pub struct WorkStealingExecutor {
/// Global queue for newly spawned tasks
injector: Injector<Arc<Task>>,
/// Per-worker local queues
workers: Vec<Worker<Arc<Task>>>,
/// Stealers for work-stealing
stealers: Vec<Stealer<Arc<Task>>>,
}
impl WorkStealingExecutor {
pub fn new(num_workers: usize) -> Self {
let injector = Injector::new();
let mut workers = Vec::with_capacity(num_workers);
let mut stealers = Vec::with_capacity(num_workers);
for _ in 0..num_workers {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
workers.push(worker);
}
WorkStealingExecutor {
injector,
workers,
stealers,
}
}
/// Try to get a task for worker `id`
fn find_task(&self, worker_id: usize) -> Option<Arc<Task>> {
let worker = &self.workers[worker_id];
// 1. Try local queue first
if let Some(task) = worker.pop() {
return Some(task);
}
// 2. Try global injector
loop {
match self.injector.steal_batch_and_pop(worker) {
crossbeam_deque::Steal::Success(task) => return Some(task),
crossbeam_deque::Steal::Empty => break,
crossbeam_deque::Steal::Retry => continue,
}
}
// 3. Try stealing from other workers
for (i, stealer) in self.stealers.iter().enumerate() {
if i == worker_id {
continue;
}
loop {
match stealer.steal() {
crossbeam_deque::Steal::Success(task) => return Some(task),
crossbeam_deque::Steal::Empty => break,
crossbeam_deque::Steal::Retry => continue,
}
}
}
None
}
}
Testing Strategy
Testing Async Code
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
#[test]
fn test_simple_future() {
let mut executor = Executor::new();
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
executor.spawn(async move {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
executor.run();
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn test_multiple_tasks() {
let mut executor = Executor::new();
let counter = Arc::new(AtomicU32::new(0));
for _ in 0..100 {
let counter_clone = counter.clone();
executor.spawn(async move {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
}
executor.run();
assert_eq!(counter.load(Ordering::SeqCst), 100);
}
#[test]
fn test_yield_and_resume() {
let mut executor = Executor::new();
let order = Arc::new(Mutex::new(Vec::new()));
let order1 = order.clone();
let order2 = order.clone();
executor.spawn(async move {
order1.lock().unwrap().push(1);
YieldNow::new().await;
order1.lock().unwrap().push(3);
});
executor.spawn(async move {
order2.lock().unwrap().push(2);
});
executor.run();
let result = order.lock().unwrap().clone();
assert_eq!(result, vec![1, 2, 3]);
}
}
Race Condition Detection
#[test]
fn test_waker_thread_safety() {
use std::thread;
let mut executor = Executor::new();
let (tx, rx) = std::sync::mpsc::channel();
executor.spawn(async move {
// This future will be woken from another thread
struct CrossThreadWake {
waker_sent: bool,
tx: std::sync::mpsc::Sender<Waker>,
rx: std::sync::mpsc::Receiver<()>,
}
impl Future for CrossThreadWake {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.waker_sent {
self.waker_sent = true;
self.tx.send(cx.waker().clone()).unwrap();
Poll::Pending
} else if self.rx.try_recv().is_ok() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
// Implementation...
});
// Another thread wakes the task
thread::spawn(move || {
let waker = rx.recv().unwrap();
waker.wake();
});
executor.run();
}
// Use Miri for deeper analysis
// cargo +nightly miri test
Loom for Concurrency Testing
#[cfg(loom)]
mod loom_tests {
use loom::sync::Arc;
use loom::thread;
#[test]
fn test_concurrent_wake() {
loom::model(|| {
let waker = Arc::new(TestWaker::new());
let waker1 = waker.clone();
let waker2 = waker.clone();
let t1 = thread::spawn(move || {
waker1.wake();
});
let t2 = thread::spawn(move || {
waker2.wake();
});
t1.join().unwrap();
t2.join().unwrap();
// Both wakes should have been processed
assert!(waker.wake_count() >= 2);
});
}
}
Common Pitfalls
Pitfall 1: Forgetting to Call wake()
Problem: Future returns Pending but never wakes, causing deadlock.
// WRONG - Never wakes!
impl Future for BadTimer {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
if self.deadline <= Instant::now() {
Poll::Ready(())
} else {
Poll::Pending // WHO WILL WAKE US?!
}
}
}
// CORRECT - Registers waker
impl Future for GoodTimer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.deadline <= Instant::now() {
Poll::Ready(())
} else {
// Register with timer system
TIMER_REGISTRY.register(self.deadline, cx.waker().clone());
Poll::Pending
}
}
}
Pitfall 2: Polling Too Aggressively (Busy-Waiting)
Problem: Executor never sleeps, burns 100% CPU.
// WRONG - Busy loop!
fn run(&self) {
loop {
for task in &self.tasks {
task.poll(); // Poll EVERYTHING constantly
}
}
}
// CORRECT - Only poll ready tasks, sleep when idle
fn run(&self) {
loop {
// Only poll tasks that were woken
while let Ok(task) = self.ready_queue.try_recv() {
task.poll();
}
// Sleep until a wake occurs
match self.ready_queue.recv_timeout(Duration::from_secs(1)) {
Ok(task) => task.poll(),
Err(_) => {
if self.task_count() == 0 {
break;
}
}
}
}
}
Pitfall 3: Memory Leaks from Circular Arc References
Problem: Task holds Waker which holds Arc
// WRONG - Potential cycle
struct BadTask {
future: Box<dyn Future<Output = ()>>,
waker: Option<Waker>, // Waker contains Arc<BadTask>!
}
// The task stores its own waker, creating:
// Arc<BadTask> -> Waker -> RawWaker -> Arc<BadTask>
// Refcount never reaches 0!
// CORRECT - Waker is created fresh for each poll
struct GoodTask {
future: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
sender: Sender<Arc<GoodTask>>, // No stored waker
}
impl GoodTask {
fn poll(self: &Arc<Self>) {
// Create waker only for this poll
let waker = self.clone().into_waker();
let mut cx = Context::from_waker(&waker);
// waker is dropped after poll, breaking the cycle
}
}
Pitfall 4: Incorrect VTable Function Implementations
Problem: Unsafe code causes undefined behavior.
// WRONG - Forgot to prevent drop
unsafe fn bad_clone(ptr: *const ()) -> RawWaker {
let arc = Arc::from_raw(ptr as *const Task);
let cloned = Arc::clone(&arc);
// OOPS: arc is dropped here, decrementing refcount!
// Original waker now points to potentially freed memory
Arc::into_raw(cloned) as *const ()
}
// CORRECT - Use mem::forget
unsafe fn good_clone(ptr: *const ()) -> RawWaker {
let arc = Arc::from_raw(ptr as *const Task);
let cloned = Arc::clone(&arc);
std::mem::forget(arc); // Don't decrement original's refcount!
RawWaker::new(Arc::into_raw(cloned) as *const (), &VTABLE)
}
Pitfall 5: Not Handling Spurious Wakeups
Problem: Assume every wake means progress.
// WRONG - Assumes wake means ready
impl Future for BadFuture {
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
// "We were woken, so we must be ready!"
Poll::Ready(()) // WRONG - might have been spurious
}
}
// CORRECT - Always re-check condition
impl Future for GoodFuture {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.is_actually_ready() {
Poll::Ready(())
} else {
// Re-register waker in case we need to wait again
self.register_waker(cx.waker().clone());
Poll::Pending
}
}
}
Extensions
I/O Reactor Integration
Integrate with the OS event system for true async I/O:
use mio::{Events, Interest, Poll, Token};
use std::os::unix::io::RawFd;
pub struct IoReactor {
poll: Poll,
token_to_waker: Mutex<HashMap<Token, Waker>>,
next_token: AtomicUsize,
}
impl IoReactor {
pub fn new() -> Self {
IoReactor {
poll: Poll::new().unwrap(),
token_to_waker: Mutex::new(HashMap::new()),
next_token: AtomicUsize::new(0),
}
}
pub fn register_readable(&self, fd: RawFd, waker: Waker) -> Token {
let token = Token(self.next_token.fetch_add(1, Ordering::SeqCst));
// Register with mio
self.poll.registry().register(
&mut mio::unix::SourceFd(&fd),
token,
Interest::READABLE,
).unwrap();
// Store waker
self.token_to_waker.lock().unwrap().insert(token, waker);
token
}
pub fn poll_events(&self, timeout: Duration) {
let mut events = Events::with_capacity(64);
self.poll.poll(&mut events, Some(timeout)).unwrap();
for event in &events {
if let Some(waker) = self.token_to_waker.lock().unwrap().remove(&event.token()) {
waker.wake();
}
}
}
}
Work-Stealing Scheduler
See Phase 6 for the full implementation. Key concepts:
WORK-STEALING ALGORITHM
=======================
1. Each worker has a local FIFO queue
2. New tasks go to local queue (cache-friendly)
3. When local queue empty:
a. Check global inject queue
b. Steal from other workers' queues
4. Stealing happens from the "old" end (LIFO steal)
5. This balances load automatically
Performance characteristics:
- Push to local: O(1), no contention
- Pop from local: O(1), no contention
- Steal: O(1), may have contention
- Overall throughput scales with cores
The Interview Questions
-
âExplain the relationship between a Future, a Waker, and an Executor.â
Answer: A Future is a value that may not be ready yet - it implements
poll()which returns Ready(value) or Pending. The Executor is the driver that calls poll() on futures. The Waker is the callback mechanism that tells the Executor âthis Future is ready to make progress, poll it again.â The Waker is passed to poll() via Context, allowing the Future to register for wake-up notifications. -
âWhy is the poll method designed to be non-blocking?â
Answer: If poll() blocked, it would prevent the executor from making progress on other tasks. The non-blocking design allows cooperative multitasking: each poll() does a bit of work, returns Pending if not done, and the executor can poll other tasks. This enables efficient concurrency without preemption.
-
âWhat happens if a Waker is dropped before the task is finished?â
Answer: If the Waker is dropped and no other wake mechanism exists, the Future may never be polled again, causing it to hang indefinitely. This is a bug in the async code. Properly designed futures ensure they either complete, return an error, or the Waker is stored somewhere that will eventually call wake().
-
âHow does Tokio handle multi-threaded scheduling?â
Answer: Tokio uses a work-stealing scheduler. Each worker thread has a local queue. New tasks go to the spawning threadâs local queue. When a workerâs queue is empty, it steals from other workersâ queues. This provides good cache locality (most work stays on one thread) while automatically load-balancing across cores.
-
âWhy does RawWaker use a vtable instead of generics?â
Answer: The Waker must be a concrete type that can be stored in Context and returned from methods. If it were generic over the executorâs task type, every Future would need to be generic too, making trait objects impossible. The vtable provides type erasure: the Waker has a fixed size (two pointers) but can call executor-specific code through the vtable function pointers.
-
âHow would you implement a timeout for a Future?â
Answer: Wrap the original future with a timeout future that races between the inner future and a timer. Use select! or a custom combinator that polls both futures and returns whichever completes first. The timer registers with the reactor and wakes when the deadline passes. If the timer wins, return a timeout error.
Books That Will Help
| Topic | Book | Chapter/Section |
|---|---|---|
| Async Design Patterns | âRust for Rustaceansâ by Jon Gjengset | Ch. 8: Asynchronous Programming |
| Atomics & Memory Ordering | âRust Atomics and Locksâ by Mara Bos | Ch. 1: Basics of Rust Concurrency |
| Trait VTables | âProgramming Rustâ by Blandy & Orendorff | Ch. 11: Traits and Generics |
| OS Concurrency | âOperating Systems: Three Easy Piecesâ | Part II: Concurrency |
| Runtime Internals | Tokio Tutorial | Advanced Topics |
| Systems Programming | âThe Linux Programming Interfaceâ | Ch. 63: Alternative I/O Models |
Summary
Building a custom async runtime teaches you the deepest internals of async Rust:
- The Future trait is a state machine that makes progress via poll()
- Wakers use type-erased vtables to notify executors about readiness
- Executors maintain ready queues and only poll woken tasks
- The Reactor/Executor split separates I/O waiting from task execution
- Arc-based tasks enable safe sharing between executor and wakers
- Timer wheels provide efficient scheduling for many timers
With this knowledge, you can:
- Debug complex async issues
- Understand Tokio/async-std internals
- Build custom async primitives
- Optimize async performance
- Contribute to async runtime projects
Whatâs Next?
After completing this project, consider:
- Project 6: Atomic Lock-Free Queue - Build the data structures used in executor queues
- Project 2: Box-less Async Trait - Understand zero-cost async with GATs
- Read the
async-executorcrate source (itâs only ~1000 lines!) - Explore Tokioâs source code for production patterns
- Build an I/O reactor using mio or io-uring