P03: Mini Async Runtime
P03: Mini Async Runtime
Build a single-threaded async runtime that handles thousands of concurrent connections
| Attribute | Value |
|---|---|
| Main Language | Rust or C |
| Difficulty | Level 5: Master |
| Coolness Level | Level 5: Pure Magic (Super Cool) |
| Knowledge Area | Asynchronous I/O, Runtime Design |
| Key Tools | epoll/kqueue, futures, tokio (for reference) |
| Main Book | โThe Linux Programming Interfaceโ - Michael Kerrisk |
Learning Objectives
By completing this project, you will:
- Understand async fundamentals โ Know exactly what happens when you
awaitsomething - Master IO multiplexing โ Use epoll (Linux) or kqueue (macOS/BSD) to handle thousands of sockets
- Build the reactor pattern โ Implement the event loop that drives async execution
- Implement futures/tasks โ Create the abstraction that makes async code readable
- Handle the C10K problem โ Serve 10,000+ concurrent connections on a single thread
- Demystify async runtimes โ Understand what Tokio, libuv, and Goโs netpoll actually do
The Core Question
โHow can a single thread handle 10,000 concurrent connections, and what makes async/await work?โ
Most developers use async like magic incantations. They call .await without understanding:
- What exactly happens when code โawaitsโ?
- How does the runtime know when an operation is ready?
- Why doesnโt the CPU spin at 100% while waiting?
- How is this different from threads?
After this project, async wonโt be magicโit will be machinery you built yourself.
The key insight: Async is just cooperative multitasking. Instead of the OS preempting threads, your code voluntarily yields when it would block. The runtime tracks whatโs waiting for what, and resumes tasks when their I/O is ready.
Deep Theoretical Foundation
1. The Problem with Threads
Thread-Per-Connection Model
Traditional servers create one thread per connection:
Connection 1 โ Thread 1 (blocked reading socket)
Connection 2 โ Thread 2 (blocked reading socket)
...
Connection 10000 โ Thread 10000 (blocked reading socket)
Why This Fails:
- Each thread needs 1-8 MB of stack space โ 10,000 threads = 10-80 GB RAM
- Context switching 10,000 threads kills performance
- Most threads are blocked waiting for I/O (wasted resources)
The C10K Problem: How do you handle 10,000 concurrent connections?
Answer: Donโt use threads for waiting. Use event-driven I/O.
2. Event-Driven I/O
Non-Blocking Sockets
Instead of blocking when data isnโt ready, non-blocking sockets return immediately:
// Blocking (traditional)
read(fd, buf, 1024); // Blocks until data arrives
// Non-blocking
fcntl(fd, F_SETFL, O_NONBLOCK);
ssize_t n = read(fd, buf, 1024);
if (n == -1 && errno == EAGAIN) {
// No data yet, but we didn't block!
// Do something else and try later
}
IO Multiplexing
The kernel can tell us which file descriptors are ready:
// "Tell me when ANY of these 10,000 sockets has data"
int ready_count = epoll_wait(epoll_fd, events, 10000, timeout);
// Now we only process the ones that are actually ready
for (int i = 0; i < ready_count; i++) {
handle_ready_socket(events[i].data.fd);
}
3. epoll (Linux) Deep Dive
Creating an epoll Instance
int epoll_fd = epoll_create1(0); // Returns a file descriptor
Registering Interest
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // Read events, edge-triggered
ev.data.fd = socket_fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &ev);
Waiting for Events
struct epoll_event events[MAX_EVENTS];
int n = epoll_wait(epoll_fd, events, MAX_EVENTS, timeout_ms);
for (int i = 0; i < n; i++) {
if (events[i].events & EPOLLIN) {
// Socket is readable
handle_read(events[i].data.fd);
}
if (events[i].events & EPOLLOUT) {
// Socket is writable
handle_write(events[i].data.fd);
}
}
Edge-Triggered vs Level-Triggered
| Mode | Behavior | Use Case |
|---|---|---|
| Level-triggered | Reports ready as long as condition holds | Simpler, more forgiving |
| Edge-triggered | Reports only on state change | More efficient, requires draining |
Edge-triggered is faster but requires reading until EAGAIN:
// Edge-triggered: must drain the socket
while (true) {
ssize_t n = read(fd, buf, sizeof(buf));
if (n == -1 && errno == EAGAIN) break; // No more data
if (n == 0) { close(fd); break; } // EOF
process_data(buf, n);
}
4. kqueue (macOS/BSD)
Similar concept, different API:
int kq = kqueue();
struct kevent ev;
EV_SET(&ev, socket_fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
kevent(kq, &ev, 1, NULL, 0, NULL); // Register
struct kevent events[MAX_EVENTS];
int n = kevent(kq, NULL, 0, events, MAX_EVENTS, &timeout);
for (int i = 0; i < n; i++) {
handle_event(&events[i]);
}
5. The Reactor Pattern
The reactor is the core of your async runtime:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Event Loop โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Reactor โ โ
โ โ โ โ
โ โ 1. Call epoll_wait() โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ โ โ
โ โ 2. For each ready event: โ โ โ
โ โ - Find associated task โ โ โ
โ โ - Wake the task โ โ โ
โ โ โ โ โ
โ โ 3. Run ready tasks until they yield โโโโโโโโโโโโโโโ โ โ
โ โ โ โ
โ โ 4. Repeat โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Reactor Components:
- Event sources: File descriptors registered with epoll/kqueue
- Task queue: Tasks waiting to be polled
- Waker registry: Maps FDs to tasks that should wake when ready
- Executor: Runs tasks when theyโre woken
6. Futures and Tasks
What is a Future?
A future represents a value that will be available later:
// Rust-style future (simplified)
trait Future {
type Output;
fn poll(&mut self, cx: &mut Context) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T), // Value is available now
Pending, // Not ready, will wake when ready
}
Polling Model
Futures are โpolledโ to make progress:
poll(future):
- If work is ready: Do work, return Ready(result)
- If would block: Register waker, return Pending
Event loop:
while tasks exist:
poll all ready tasks
wait for events
wake tasks whose events fired
Wakers
A waker tells the runtime โthis task should be polled againโ:
struct Waker {
task_id: TaskId,
wake_fn: fn(TaskId),
}
impl Waker {
fn wake(&self) {
(self.wake_fn)(self.task_id);
}
}
When a future would block:
- It stores the waker somewhere (e.g., in the reactorโs FD-to-waker map)
- Returns
Pending - When the FD becomes ready, the reactor calls
waker.wake() - The task is added back to the ready queue
7. State Machines (How async/await Compiles)
Consider this async function:
async fn read_and_write(stream: TcpStream) {
let data = stream.read().await; // Suspend point 1
stream.write(data).await; // Suspend point 2
}
The compiler transforms it into a state machine:
enum ReadAndWriteState {
Start { stream: TcpStream },
WaitingForRead { stream: TcpStream, read_future: ReadFuture },
WaitingForWrite { stream: TcpStream, data: Vec<u8>, write_future: WriteFuture },
Done,
}
impl Future for ReadAndWriteStateMachine {
fn poll(&mut self, cx: &mut Context) -> Poll<()> {
loop {
match self.state {
Start { stream } => {
let read_future = stream.read();
self.state = WaitingForRead { stream, read_future };
}
WaitingForRead { stream, read_future } => {
match read_future.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(data) => {
let write_future = stream.write(data);
self.state = WaitingForWrite { stream, data, write_future };
}
}
}
WaitingForWrite { write_future, .. } => {
match write_future.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {
self.state = Done;
return Poll::Ready(());
}
}
}
Done => panic!("polled after completion"),
}
}
}
}
Key insight: Async functions become state machines where each await point is a state transition.
8. Timers
Async runtimes need timers for timeouts and delays:
sleep(Duration::from_secs(5)).await;
Implementation approaches:
- Timer wheel: Efficient for many timers, O(1) insert/expire
- Binary heap: Simple priority queue, O(log n) operations
- timerfd (Linux): Kernel timer as a file descriptor
// timerfd example
int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
struct itimerspec spec = {
.it_value = { .tv_sec = 5, .tv_nsec = 0 },
.it_interval = { 0 }
};
timerfd_settime(tfd, 0, &spec, NULL);
// Register with epoll like any other FD
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, tfd, &ev);
Project Specification
What Youโll Build
A single-threaded async runtime capable of:
// Example API (Rust-style)
let runtime = Runtime::new();
runtime.block_on(async {
// TCP server handling many connections concurrently
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (stream, addr) = listener.accept().await?;
spawn(handle_connection(stream)); // Spawn concurrent task
}
});
async fn handle_connection(stream: TcpStream) {
let mut buf = [0u8; 1024];
loop {
let n = stream.read(&mut buf).await?;
if n == 0 { break; }
stream.write_all(&buf[..n]).await?;
}
}
Deliverables
- Reactor: Event loop using epoll (Linux) or kqueue (macOS)
- Executor: Task scheduler that polls futures
- Async TCP:
TcpListenerandTcpStreamwith async read/write - Timer support:
sleep()andtimeout()primitives - Echo server: Demo serving 10,000 concurrent connections
- Benchmark suite: Comparison with synchronous and Tokio
Success Criteria
# Build and run echo server
$ cargo run --release --example echo_server
# Benchmark with wrk or similar
$ wrk -c 1000 -d 30s http://localhost:8080/
Requests/sec: 100,000+
Latency: < 1ms avg
# Memory usage stays flat with connection count
$ ./echo_server &
$ for i in $(seq 1 10000); do nc localhost 8080 & done
$ ps aux | grep echo_server
# RSS should be < 100 MB
Solution Architecture
Component Overview
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Async Runtime โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Executor โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Task Queue (Ready Tasks) โ โ โ
โ โ โ [Task 1] [Task 2] [Task 3] ... โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ poll() โ โ
โ โ โผ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Futures โ โ โ
โ โ โ TcpRead { fd, buf, waker } โ โ โ
โ โ โ TcpWrite { fd, data, waker } โ โ โ
โ โ โ Sleep { deadline, waker } โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ register waker โ โ
โ โ โผ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ Reactor โ โ โ
โ โ โ epoll_fd: 5 โ โ โ
โ โ โ waker_map: { fd โ waker } โ โ โ
โ โ โ timer_queue: BinaryHeap<(deadline, waker)> โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ epoll_wait() โ โ
โ โ โผ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โ โ OS Kernel (epoll/kqueue) โ โ โ
โ โ โ [socket 1: readable] [socket 2: writable] ... โ โ โ
โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Data Structures
Task
struct Task {
id: TaskId,
future: Pin<Box<dyn Future<Output = ()>>>,
state: TaskState,
}
enum TaskState {
Ready, // Should be polled
Waiting, // Waiting for I/O or timer
Completed, // Done, can be removed
}
Reactor
struct Reactor {
epoll_fd: RawFd,
waker_map: HashMap<RawFd, Waker>,
timer_queue: BinaryHeap<TimerEntry>,
}
impl Reactor {
fn register(&mut self, fd: RawFd, interest: Interest, waker: Waker);
fn deregister(&mut self, fd: RawFd);
fn poll(&mut self, timeout: Option<Duration>) -> Vec<Waker>;
}
Executor
struct Executor {
ready_queue: VecDeque<TaskId>,
tasks: HashMap<TaskId, Task>,
reactor: Reactor,
}
impl Executor {
fn spawn(&mut self, future: impl Future<Output = ()>);
fn block_on<T>(&mut self, future: impl Future<Output = T>) -> T;
}
Key Algorithms
Main Event Loop
block_on(main_future):
1. spawn(main_future)
2. loop:
a. while ready_queue not empty:
- task = ready_queue.pop()
- result = task.future.poll(create_waker(task.id))
- if result == Ready: remove task
- if result == Pending: mark task waiting
b. if no tasks remain: return
c. next_timer = get_next_timer_deadline()
d. timeout = next_timer - now()
e. wakers = reactor.poll(timeout)
f. for each waker:
- mark task ready
- add to ready_queue
Async Read Implementation
struct TcpRead<'a> {
stream: &'a TcpStream,
buf: &'a mut [u8],
}
impl Future for TcpRead<'_> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.stream.try_read(self.buf) {
Ok(n) => Poll::Ready(Ok(n)),
Err(e) if e.kind() == WouldBlock => {
// Register waker with reactor
REACTOR.with(|r| {
r.borrow_mut().register(
self.stream.fd,
Interest::Readable,
cx.waker().clone()
);
});
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
Phased Implementation Guide
Phase 1: Event Loop Skeleton (Days 1-2)
Goal: Basic epoll wrapper that can detect socket events.
Steps:
- Create epoll instance
- Accept a TCP connection, set non-blocking
- Register socket with epoll
- Loop: wait for events, print when readable
Test: Can detect when data arrives on a socket.
Phase 2: Simple Executor (Days 3-4)
Goal: Execute futures without async I/O.
Steps:
- Define
Futuretrait andPollenum - Create task struct holding boxed futures
- Implement ready queue and polling loop
- Test with futures that just compute (no I/O)
Test: Can execute a chain of futures that pass values.
Phase 3: Waker System (Days 4-5)
Goal: Futures can register to be woken.
Steps:
- Implement
WakerandContext - Create waker-to-task mapping in reactor
- When epoll returns ready FD, find and wake the task
- Test with manual I/O futures
Test: Future sleeps, wakes when socket is readable.
Phase 4: Async TCP (Days 6-7)
Goal: Usable async TcpListener and TcpStream.
Steps:
- Wrap raw sockets in async types
- Implement
async fn accept(),async fn read(),async fn write() - Handle edge-triggered epoll correctly (drain reads)
- Test with echo server
Test: Echo server handles multiple concurrent connections.
Phase 5: Timers (Days 7-8)
Goal: sleep() and timeout() work.
Steps:
- Add timer queue (binary heap by deadline)
- Calculate epoll timeout from next timer
- Implement
Sleepfuture - Implement
timeout()combinator
Test: Requests that take too long are cancelled.
Phase 6: Polish and Benchmark (Days 9-10)
Goal: Production-quality runtime.
Steps:
- Error handling throughout
- Graceful shutdown
- Benchmark against Tokio and synchronous
- Document internals
Testing Strategy
Unit Tests
#[test]
fn test_future_polling() {
let mut fut = async { 42 };
let waker = dummy_waker();
let mut cx = Context::from_waker(&waker);
// Should complete immediately
assert!(matches!(Pin::new(&mut fut).poll(&mut cx), Poll::Ready(42)));
}
#[test]
fn test_spawn_and_run() {
let runtime = Runtime::new();
let counter = Rc::new(Cell::new(0));
let counter2 = counter.clone();
runtime.block_on(async move {
for _ in 0..100 {
spawn(async { counter2.set(counter2.get() + 1); });
}
});
assert_eq!(counter.get(), 100);
}
Integration Tests
#[test]
fn test_echo_server() {
let runtime = Runtime::new();
runtime.block_on(async {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr();
spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
// Echo logic
});
// Client connects and sends data
let mut client = TcpStream::connect(addr).await.unwrap();
client.write_all(b"hello").await.unwrap();
let mut buf = [0u8; 5];
client.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
});
}
Stress Tests
#[test]
fn test_many_connections() {
let runtime = Runtime::new();
runtime.block_on(async {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
// Accept 10,000 connections
for _ in 0..10_000 {
let (stream, _) = listener.accept().await.unwrap();
spawn(handle_connection(stream));
}
});
}
Common Pitfalls and Debugging
Pitfall 1: Forgetting to Register Waker
Symptom: Task never wakes up after returning Pending.
Cause: Forgot to register with reactor before returning Pending.
Fix:
fn poll(...) -> Poll<T> {
if not_ready {
// MUST register waker before returning Pending!
reactor.register(fd, cx.waker().clone());
return Poll::Pending;
}
Poll::Ready(value)
}
Pitfall 2: Edge-Triggered Partial Reads
Symptom: Connections hang after receiving some data.
Cause: With edge-triggered epoll, you get notified once when data arrives. If you donโt read all of it, you wonโt get another notification.
Fix:
// Always drain the socket
loop {
match socket.try_read(&mut buf) {
Ok(0) => break, // EOF
Ok(n) => process(&buf[..n]),
Err(e) if e.kind() == WouldBlock => break, // Done
Err(e) => return Err(e),
}
}
Pitfall 3: Waker Cloning
Symptom: Memory leaks or double-wakes.
Cause: Wakers must be cloned carefullyโtheyโre reference-counted.
Fix: Use the waker from Context, clone it when storing:
// Store a clone, not a reference
self.stored_waker = Some(cx.waker().clone());
Pitfall 4: Timer Queue Ordering
Symptom: Timers fire at wrong times or not at all.
Cause: Binary heap ordered incorrectly, or timeout calculation wrong.
Fix:
// Timer entry should order by deadline (earliest first)
impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse because BinaryHeap is a max-heap
other.deadline.cmp(&self.deadline)
}
}
Extensions and Challenges
Extension 1: Multi-Threaded Runtime
Add a thread pool for CPU-bound work:
let result = spawn_blocking(|| {
expensive_computation()
}).await;
Extension 2: Async Filesystem I/O
Use io_uring (Linux 5.1+) for true async file I/O:
let data = async_read_file("data.txt").await?;
Extension 3: Select/Join Combinators
// Wait for first to complete
let result = select! {
a = future_a => handle_a(a),
b = future_b => handle_b(b),
};
// Wait for all to complete
let (a, b) = join!(future_a, future_b);
Extension 4: Cancellation
let handle = spawn(long_running_task());
handle.cancel(); // Task is dropped at next yield point
Real-World Connections
Where Async Runtimes are Used
- Web Servers: Nginx, Node.js, Rustโs Actix/Axum all use event-driven I/O
- Databases: Redis, Memcached handle thousands of connections on few threads
- Proxies: HAProxy, Envoy use epoll for massive connection counts
- Game Servers: Handle many players with minimal threads
Industry Runtimes to Study
| Runtime | Language | Key Feature |
|---|---|---|
| Tokio | Rust | Work-stealing, io_uring support |
| libuv | C | Powers Node.js, cross-platform |
| Go runtime | Go | Goroutines with network poller |
| async-std | Rust | Simpler alternative to Tokio |
Resources
Essential Reading
- โThe Linux Programming Interfaceโ by Michael Kerrisk โ Chapter 63: Alternative I/O Models
- โAsynchronous Programming in Rustโ (Async Book) โ Official Rust async documentation
- โ200 Lines of Rustโ by Cfsamson โ Excellent async runtime tutorial
Papers and Articles
- โScalable I/O Event Notificationโ โ kqueue paper
- โEpoll is fundamentally brokenโ โ Marek Majkowski (Cloudflare)
- โHow Tokio worksโ โ Alice Ryhl (Tokio maintainer)
Code to Study
- tokio โ Production Rust async runtime
- mio โ Low-level I/O library (epoll/kqueue wrapper)
- smol โ Minimal async runtime (great for learning)
- libuv โ Cross-platform event loop (C)
Self-Assessment Checklist
Before considering this project complete, verify:
Understanding
- I can explain what happens when code calls
.await - I can describe the difference between level-triggered and edge-triggered epoll
- I can explain how wakers connect futures to the reactor
- I can draw the state machine an async function compiles to
- I can explain why async uses less memory than threads
Implementation
- My reactor correctly uses epoll/kqueue
- My executor polls tasks and handles wakers
- I can handle 1,000+ concurrent connections
- Timers work correctly (sleep, timeout)
- No busy-waiting (CPU usage drops when idle)
Verification
- Echo server handles 10,000 concurrent connections
- Memory usage scales sub-linearly with connections
- Latency stays low under load (< 1ms p99)
- No memory leaks (check with valgrind)
Building an async runtime teaches you what all the magic keywords actually do. After this project, async/await will never be mysterious again.