Project 11: Async File I/O Library
Build an async file I/O library where read/write operations are non-blocking coroutines, backed by a thread pool that simulates production async I/O patterns.
Quick Reference
| Attribute | Value |
|---|---|
| Language | C++20 (coroutines required) |
| Difficulty | Expert |
| Time | 2-3 weeks |
| Prerequisites | Project 3 (Thread Pool), Project 10 (Async Task Framework) |
| Coolness | Level 4: Hardcore Tech Flex |
| Portfolio Value | Service & Support Model |
Learning Objectives
By completing this project, you will be able to:
- Implement spawn_blocking for thread offloading: Create a mechanism that queues synchronous work to a thread pool and returns a Task that completes when the work finishes
- Design AsyncFile with coroutine-based read/write: Build file handles whose operations return
Task<size_t>that can beco_awaited without blocking the async runtime - Bridge synchronous and asynchronous APIs: Understand why traditional file I/O is blocking and how to wrap it in non-blocking coroutine interfaces
- Handle errno-based errors in async contexts: Map C-style errno errors to C++ exceptions that propagate correctly through coroutine suspension points
- Implement proper RAII for async file handles: Ensure file descriptors are correctly managed across async lifetimes and coroutine frames
- Understand the architecture of production async runtimes: See how Tokio, ASIO, and libuv actually structure their I/O layers
- Optimize with readahead and write buffering: Implement performance optimizations that overlap I/O with computation
- Prepare for io_uring integration: Understand the limitations of thread-pool-based async I/O and how true kernel async I/O eliminates them
- Profile and benchmark async I/O performance: Measure throughput, latency, and thread utilization to verify your implementation
- Debug async I/O issues: Trace through coroutine suspension to find bugs in async file operations
Theoretical Foundation
Core Concepts
Why File I/O Needs Special Handling
Traditional file I/O in POSIX systems is synchronous and blocking. When you call read() or write(), the calling thread is suspended until the kernel completes the operation:
Traditional Blocking I/O:
Thread Kernel Disk
| | |
+-- read(fd) ------->| |
| [BLOCKED] +-- disk read ----->|
| [BLOCKED] |<-- data ----------+
|<-- return data ----+ |
| | |
[Thread was idle for entire disk access time]
This is problematic for async programs because:
- Thread starvation: Blocking threads can’t run other coroutines
- Wasted resources: Threads consume memory even when waiting
- Poor scalability: Need one thread per concurrent I/O operation
The Async I/O Solution
Async runtimes solve this by separating the “request” from the “completion”:
Async I/O with Thread Pool:
Coroutine Thread Pool Kernel Disk
| | | |
co_await read() | | |
| | | |
[suspend] -----------> queue work | |
| Worker picks up | |
| +-- read(fd) ----->| |
| | [blocked] +-- disk ------>|
| | [blocked] |<-- data ------+
| |<-- return --------+ |
|<----- resume ------+ | |
| | | |
[Coroutine was suspended, runtime ran other tasks]
Three Approaches to Async File I/O
1. Thread Pool Offloading (This Project)
- Queue blocking I/O to dedicated threads
- Simple to implement, works everywhere
- Overhead: context switches, thread pool management
- Used by: Node.js (libuv), early Tokio
2. io_uring (Linux 5.1+)
- Kernel-based async I/O with shared ring buffers
- Zero-copy, minimal syscalls
- Highest performance for file I/O
- Used by: modern Tokio, Glommio, io_uring crate
3. Windows IOCP (Completion Ports)
- Native Windows async I/O
- Similar concept to io_uring
- Used by: ASIO on Windows, Tokio on Windows
Performance Comparison:
Thread Pool io_uring IOCP
| | |
Latency: ~10-50μs ~1-5μs ~1-5μs
Throughput: Good Excellent Excellent
Syscalls/op: 2 0-1 1
Context switches: 2 0 1
Platform: Any Linux 5.1+ Windows
Complexity: Low High High
Blocking vs Non-Blocking: The Fundamental Distinction
Blocking call: Thread is suspended, OS scheduler removes it from ready queue
// Thread 1 is stuck here until disk responds
ssize_t bytes = read(fd, buffer, size); // BLOCKS
Non-blocking call: Returns immediately, check later for completion
// Returns immediately, but might not have data
ssize_t bytes = read(fd, buffer, size); // Returns -1 with EAGAIN
Async/await abstraction: Looks blocking, but coroutine suspends while thread runs other work
// Syntactically synchronous, but non-blocking to the runtime
size_t bytes = co_await file.read(buffer); // Suspends coroutine, not thread
Why This Matters
Understanding async file I/O is crucial because:
- Real-world applications are I/O bound: Most server applications spend time waiting for disk/network, not computing
- Thread pools have costs: Understanding the overhead helps you make architectural decisions
- Production runtimes use these patterns: Tokio, ASIO, libuv all work this way
- io_uring is the future: But understanding thread-based async helps you appreciate io_uring’s improvements
- Debugging async I/O is hard: Knowing the architecture helps you trace problems
Historical Context
The evolution of async I/O in systems programming:
1990s: select/poll
- Wait on multiple file descriptors
- Limited to network sockets, not files
- O(n) per call, poor scalability
2000s: epoll/kqueue
- O(1) event notification
- Still network-focused
- libevent, libev emerge
2010s: libuv/ASIO
- Thread pool for file I/O
- Event loop for network I/O
- Node.js popularizes the pattern
2019+: io_uring
- True async for everything
- Shared memory ring buffers
- Near-zero syscall overhead
Timeline of Async I/O Patterns:
1983 select() - BSD 4.2
1995 poll() - POSIX
2002 epoll() - Linux 2.5.44
2000 kqueue() - FreeBSD 4.1
2002 IOCP - Windows 2000
2009 libuv - Node.js project
2011 ASIO - Boost.Asio, later standalone
2019 io_uring - Linux 5.1
2020 C++20 - Coroutines standardized
Common Misconceptions
Misconception 1: “Async file I/O doesn’t need threads” Reality: On most systems, file I/O has no true async API. The “async” is achieved by moving blocking calls to worker threads. Only io_uring and IOCP provide kernel-level async file I/O.
Misconception 2: “O_NONBLOCK makes file I/O async”
Reality: O_NONBLOCK only works for network sockets and pipes. For regular files, read() and write() always block, regardless of this flag.
Misconception 3: “Async I/O is always faster” Reality: For single sequential reads, sync I/O is faster (no thread pool overhead). Async I/O shines when:
- You have many concurrent operations
- You can overlap I/O with computation
- You need to maintain responsiveness
Misconception 4: “The thread pool size should match CPU cores” Reality: I/O thread pools should be sized based on expected I/O concurrency, not CPU cores. A 4-core machine might need 32+ I/O threads if doing many parallel file operations.
Project Specification
What You Will Build
An async file I/O library that provides:
AsyncFileclass withco_await-ableread()andwrite()methodsspawn_blocking()function to offload any synchronous work to a thread pool- Integration with the Runtime from Project 10
- Proper error handling with errno-to-exception mapping
- RAII-compliant file handle management
API Design
// Open modes
enum class OpenMode {
Read = 0x01,
Write = 0x02,
Create = 0x04,
Truncate = 0x08,
Append = 0x10
};
// Allow bitwise OR
constexpr OpenMode operator|(OpenMode a, OpenMode b);
// The async file handle
class AsyncFile {
public:
// Returns Task that yields AsyncFile when file is opened
static Task<AsyncFile> open(Runtime& runtime,
std::string path,
OpenMode mode);
// Read into buffer, returns bytes read (0 = EOF)
Task<size_t> read(std::span<char> buffer);
// Write from buffer, returns bytes written
Task<size_t> write(std::span<const char> buffer);
// Seek to position
Task<off_t> seek(off_t offset, int whence = SEEK_SET);
// Get current file size
Task<size_t> size();
// Explicit close (also called by destructor)
Task<void> close();
// File descriptor access (for advanced use)
int fd() const noexcept;
private:
int fd_;
Runtime& runtime_;
bool closed_ = false;
};
// The spawn_blocking primitive
template<typename F>
Task<std::invoke_result_t<F>> spawn_blocking(Runtime& runtime, F&& func);
Functional Requirements
- File Opening
- Open files with various mode combinations
- Handle file not found, permission denied, etc.
- Create files when OpenMode::Create is specified
- Reading
- Read up to N bytes into a buffer
- Return actual bytes read (may be less than requested)
- Return 0 on EOF
- Handle partial reads correctly
- Writing
- Write from buffer to file
- Handle partial writes correctly
- Support append mode
- Error Handling
- Convert errno to meaningful exceptions
- Propagate exceptions through coroutine suspension
- Clean up resources on error
- Resource Management
- Close file descriptors when AsyncFile is destroyed
- Handle close() being called multiple times
- Prevent use-after-close
Real World Outcome
When complete, you can write code like this:
Task<void> copy_file(Runtime& runtime, std::string src, std::string dst) {
auto in = co_await AsyncFile::open(runtime, src, OpenMode::Read);
auto out = co_await AsyncFile::open(runtime, dst,
OpenMode::Write | OpenMode::Create | OpenMode::Truncate);
std::vector<char> buffer(64 * 1024); // 64KB buffer
while (true) {
size_t bytes_read = co_await in.read(buffer);
if (bytes_read == 0) break; // EOF
std::span<const char> data(buffer.data(), bytes_read);
while (!data.empty()) {
size_t bytes_written = co_await out.write(data);
data = data.subspan(bytes_written);
}
}
std::cout << "Copy complete!\n";
}
Task<void> process_large_file(Runtime& runtime, std::string path) {
auto file = co_await AsyncFile::open(runtime, path, OpenMode::Read);
std::vector<char> buffer(1024 * 1024); // 1MB buffer
size_t total = 0;
size_t line_count = 0;
while (true) {
size_t bytes = co_await file.read(buffer);
if (bytes == 0) break;
total += bytes;
// Count lines in this chunk
for (size_t i = 0; i < bytes; ++i) {
if (buffer[i] == '\n') ++line_count;
}
}
std::cout << "Processed " << total << " bytes, "
<< line_count << " lines\n";
}
int main() {
Runtime runtime(4); // 4 I/O threads
runtime.block_on(copy_file(runtime, "large.bin", "copy.bin"));
runtime.block_on(process_large_file(runtime, "data.txt"));
return 0;
}
Expected output:
$ ./async_file_demo
[Runtime] Starting with 4 worker threads
[Worker-1] Starting copy_file
[Worker-2] async_open: opening 'large.bin'
[Worker-2] async_open: success, fd=3
[Worker-3] async_open: opening 'copy.bin' (create)
[Worker-3] async_open: success, fd=4
[Worker-1] async_read: queued for fd=3, size=65536
[Worker-2] async_read: completed 65536 bytes
[Worker-1] async_write: queued for fd=4, size=65536
[Worker-3] async_write: completed 65536 bytes
... (continues, interleaving reads and writes)
[Worker-1] async_read: completed 0 bytes (EOF)
Copy complete!
Copying 10GB file...
Elapsed: 45 seconds
Throughput: 227 MB/s
I/O operations: 163,840 reads, 163,840 writes
Comparison with sync implementation:
Synchronous copy: 52 seconds
Async with overlap: 45 seconds (15% faster due to I/O overlap)
[Worker-1] Starting process_large_file
[Worker-2] async_open: opening 'data.txt'
[Worker-2] async_open: success, fd=5
... (processing)
Processed 1073741824 bytes, 15000000 lines
[Runtime] Shutting down
[Runtime] All tasks complete
Solution Architecture
High-Level Design
┌─────────────────────────────────────────────────────────────────────────────┐
│ User Coroutine Layer │
│ │
│ Task<void> copy_file() { │
│ auto f = co_await AsyncFile::open(...); │
│ auto bytes = co_await f.read(buffer); ◄── Suspends here │
│ co_await f.write(data); ◄── And here │
│ } │
└────────────────────────────────────────┬────────────────────────────────────┘
│
│ Task<T>
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ AsyncFile Class │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ Task<size_t> read(buffer) { │ │
│ │ return spawn_blocking([=] { │ │
│ │ return ::read(fd_, buffer.data(), buffer.size()); │ │
│ │ }); │ │
│ │ } │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────┬────────────────────────────────────┘
│
│ spawn_blocking()
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Runtime (from Project 10) │
│ │
│ ┌──────────────────┐ ┌──────────────────────────────────────────┐ │
│ │ Async Executor │ │ I/O Thread Pool │ │
│ │ │ │ │ │
│ │ Coroutine Queue │◄────►│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ │ │ Worker1 │ │ Worker2 │ │ Worker3 │ │ │
│ │ Ready tasks get │ │ │ blocked │ │ idle │ │ blocked │ │ │
│ │ resumed here │ │ │ on I/O │ │ │ │ on I/O │ │ │
│ └──────────────────┘ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────┬────────────────────────────────────┘
│
│ Blocking syscalls
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Operating System │
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Kernel │ │
│ │ │ │
│ │ read(fd, buf, n) ──────► Disk Controller ──────► Disk │ │
│ │ │ │
│ │ Thread is blocked until disk responds │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
The spawn_blocking Flow
User Coroutine Runtime I/O Worker Thread
│ │ │
│ co_await spawn_blocking(f) │ │
│ │ │
├────────► queue(f) ─────────►│ │
│ │ │
[suspend] │ ◄──── worker dequeues task ───────┤
│ │ │
│ │ execute f() │
│ │ │ │
│ │ [blocking I/O] │
│ │ │ │
│ │ result = f() │
│ │ │ │
│ ◄────── resume with result ─┼───────────┘ │
│ │ │
[continue] │ │
│ │ │
Key Components
| Component | Responsibility | Key Design Decisions |
|---|---|---|
AsyncFile |
File handle with async operations | Owns fd, holds Runtime reference, RAII close |
spawn_blocking() |
Offload sync work to thread pool | Returns Task |
BlockingTask |
Awaitable for spawn_blocking result | Wraps promise/future for thread communication |
Runtime |
Manages async executor + I/O pool | Separate pools for async tasks vs blocking I/O |
IOError |
Exception for file operation failures | Maps errno to descriptive messages |
Data Structures
// Error handling
class IOError : public std::runtime_error {
public:
IOError(const std::string& operation, int error_code);
int error_code() const noexcept { return error_code_; }
const char* operation() const noexcept { return operation_.c_str(); }
private:
std::string operation_;
int error_code_;
};
// The blocking task awaitable
template<typename T>
class BlockingTask {
public:
BlockingTask(Runtime& runtime, std::function<T()> work);
bool await_ready() const noexcept { return completed_.load(); }
void await_suspend(std::coroutine_handle<> continuation) {
continuation_ = continuation;
runtime_.submit_blocking([this] {
try {
if constexpr (std::is_void_v<T>) {
work_();
} else {
result_.emplace(work_());
}
} catch (...) {
exception_ = std::current_exception();
}
completed_.store(true);
runtime_.schedule(continuation_);
});
}
T await_resume() {
if (exception_) {
std::rethrow_exception(exception_);
}
if constexpr (!std::is_void_v<T>) {
return std::move(*result_);
}
}
private:
Runtime& runtime_;
std::function<T()> work_;
std::coroutine_handle<> continuation_;
std::atomic<bool> completed_{false};
std::optional<T> result_;
std::exception_ptr exception_;
};
// File metadata for debugging/stats
struct FileStats {
size_t total_reads = 0;
size_t total_writes = 0;
size_t bytes_read = 0;
size_t bytes_written = 0;
std::chrono::steady_clock::time_point opened_at;
};
// The async file class
class AsyncFile {
public:
static Task<AsyncFile> open(Runtime& runtime,
std::string path,
OpenMode mode);
AsyncFile(AsyncFile&& other) noexcept;
AsyncFile& operator=(AsyncFile&& other) noexcept;
~AsyncFile();
// Delete copy operations (file handles are unique)
AsyncFile(const AsyncFile&) = delete;
AsyncFile& operator=(const AsyncFile&) = delete;
Task<size_t> read(std::span<char> buffer);
Task<size_t> write(std::span<const char> buffer);
Task<off_t> seek(off_t offset, int whence = SEEK_SET);
Task<size_t> size();
Task<void> close();
int fd() const noexcept { return fd_; }
bool is_open() const noexcept { return fd_ >= 0 && !closed_; }
const FileStats& stats() const noexcept { return stats_; }
private:
AsyncFile(int fd, Runtime& runtime);
int fd_;
Runtime& runtime_;
bool closed_ = false;
FileStats stats_;
};
Error Handling Strategy
// errno to exception mapping
IOError make_io_error(const std::string& operation) {
int err = errno;
std::string message = operation + ": " + std::strerror(err);
return IOError(message, err);
}
// Common error codes and their meanings
switch (errno) {
case ENOENT: // File not found
case EACCES: // Permission denied
case EEXIST: // File exists (when O_EXCL)
case ENOSPC: // No space left on device
case EDQUOT: // Disk quota exceeded
case EIO: // I/O error (hardware failure)
case EINTR: // Interrupted (should retry)
case EAGAIN: // Would block (for non-blocking, shouldn't happen for files)
}
Implementation Guide
Development Environment Setup
# Requirements: C++20 compiler with coroutine support
# GCC 10+ or Clang 14+ recommended
# Check compiler version
g++ --version # Need GCC 10+
# or
clang++ --version # Need Clang 14+
# Create project structure
mkdir -p async-file-io/{src,include,tests,examples}
cd async-file-io
# Create CMakeLists.txt
cat > CMakeLists.txt << 'EOF'
cmake_minimum_required(VERSION 3.16)
project(async_file_io CXX)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Coroutine support
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
add_compile_options(-fcoroutines)
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
add_compile_options(-fcoroutines-ts)
endif()
# Thread support
find_package(Threads REQUIRED)
add_library(async_file_io
src/runtime.cpp
src/async_file.cpp
src/io_error.cpp
)
target_include_directories(async_file_io PUBLIC include)
target_link_libraries(async_file_io Threads::Threads)
# Examples
add_executable(file_copy examples/file_copy.cpp)
target_link_libraries(file_copy async_file_io)
add_executable(file_process examples/file_process.cpp)
target_link_libraries(file_process async_file_io)
# Tests
enable_testing()
add_executable(test_async_file tests/test_async_file.cpp)
target_link_libraries(test_async_file async_file_io)
add_test(NAME async_file_tests COMMAND test_async_file)
EOF
Project Structure
async-file-io/
├── include/
│ ├── async_file.hpp # AsyncFile class
│ ├── runtime.hpp # Runtime with I/O pool (from Project 10, extended)
│ ├── task.hpp # Task<T> coroutine type (from Project 10)
│ ├── io_error.hpp # IOError exception class
│ └── spawn_blocking.hpp # spawn_blocking implementation
├── src/
│ ├── async_file.cpp # AsyncFile implementation
│ ├── runtime.cpp # Runtime implementation
│ └── io_error.cpp # IOError implementation
├── examples/
│ ├── file_copy.cpp # Copy file demo
│ ├── file_process.cpp # Process large file demo
│ └── concurrent_io.cpp # Multiple concurrent file operations
├── tests/
│ ├── test_async_file.cpp # Unit tests
│ ├── test_spawn_blocking.cpp
│ └── test_error_handling.cpp
├── CMakeLists.txt
└── README.md
Implementation Phases
Phase 1: spawn_blocking Foundation (Days 1-3)
Goals:
- Extend Runtime with an I/O thread pool
- Implement spawn_blocking primitive
- Test basic offloading
Key Code:
// spawn_blocking.hpp
template<typename F>
auto spawn_blocking(Runtime& runtime, F&& func)
-> Task<std::invoke_result_t<F>>
{
using ReturnType = std::invoke_result_t<F>;
// Create a promise/future pair for cross-thread communication
auto promise = std::make_shared<std::promise<ReturnType>>();
auto future = promise->get_future();
// Submit work to I/O thread pool
runtime.submit_to_io_pool([func = std::forward<F>(func), promise]() mutable {
try {
if constexpr (std::is_void_v<ReturnType>) {
func();
promise->set_value();
} else {
promise->set_value(func());
}
} catch (...) {
promise->set_exception(std::current_exception());
}
});
// Return a Task that awaits the future
co_return co_await make_task_from_future(runtime, std::move(future));
}
// In runtime.hpp - add I/O pool
class Runtime {
public:
explicit Runtime(size_t async_threads = 0, size_t io_threads = 4);
// Submit to async executor (for coroutines)
void schedule(std::coroutine_handle<> handle);
// Submit to I/O thread pool (for blocking work)
template<typename F>
void submit_to_io_pool(F&& func);
// Run until task completes
template<typename T>
T block_on(Task<T> task);
private:
// Async coroutine executor
std::queue<std::coroutine_handle<>> ready_queue_;
std::mutex queue_mutex_;
std::condition_variable queue_cv_;
std::vector<std::thread> async_workers_;
// Blocking I/O thread pool
std::queue<std::function<void()>> io_queue_;
std::mutex io_mutex_;
std::condition_variable io_cv_;
std::vector<std::thread> io_workers_;
std::atomic<bool> shutdown_{false};
};
Testing checkpoint:
Task<int> test_spawn_blocking(Runtime& runtime) {
int result = co_await spawn_blocking(runtime, [] {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 42;
});
assert(result == 42);
co_return result;
}
Phase 2: Basic AsyncFile (Days 4-7)
Goals:
- Implement AsyncFile::open()
- Implement read() and write()
- Handle basic error cases
Key Code:
// async_file.cpp
Task<AsyncFile> AsyncFile::open(Runtime& runtime,
std::string path,
OpenMode mode) {
int flags = 0;
// Convert OpenMode to POSIX flags
if ((mode & OpenMode::Read) && (mode & OpenMode::Write)) {
flags = O_RDWR;
} else if (mode & OpenMode::Write) {
flags = O_WRONLY;
} else {
flags = O_RDONLY;
}
if (mode & OpenMode::Create) flags |= O_CREAT;
if (mode & OpenMode::Truncate) flags |= O_TRUNC;
if (mode & OpenMode::Append) flags |= O_APPEND;
int fd = co_await spawn_blocking(runtime, [path, flags]() {
int fd = ::open(path.c_str(), flags, 0644);
if (fd < 0) {
throw make_io_error("open(" + path + ")");
}
return fd;
});
co_return AsyncFile(fd, runtime);
}
Task<size_t> AsyncFile::read(std::span<char> buffer) {
if (!is_open()) {
throw IOError("read on closed file", EBADF);
}
int fd = fd_; // Capture by value
ssize_t result = co_await spawn_blocking(runtime_, [fd, buffer]() {
ssize_t bytes = ::read(fd, buffer.data(), buffer.size());
if (bytes < 0) {
throw make_io_error("read");
}
return bytes;
});
stats_.total_reads++;
stats_.bytes_read += result;
co_return static_cast<size_t>(result);
}
Task<size_t> AsyncFile::write(std::span<const char> buffer) {
if (!is_open()) {
throw IOError("write on closed file", EBADF);
}
int fd = fd_;
ssize_t result = co_await spawn_blocking(runtime_, [fd, buffer]() {
ssize_t bytes = ::write(fd, buffer.data(), buffer.size());
if (bytes < 0) {
throw make_io_error("write");
}
return bytes;
});
stats_.total_writes++;
stats_.bytes_written += result;
co_return static_cast<size_t>(result);
}
Testing checkpoint:
Task<void> test_read_write(Runtime& runtime) {
// Write test
{
auto file = co_await AsyncFile::open(runtime, "/tmp/test.txt",
OpenMode::Write | OpenMode::Create | OpenMode::Truncate);
std::string data = "Hello, async world!";
size_t written = co_await file.write(std::span(data));
assert(written == data.size());
}
// Read test
{
auto file = co_await AsyncFile::open(runtime, "/tmp/test.txt",
OpenMode::Read);
std::vector<char> buffer(100);
size_t read = co_await file.read(buffer);
std::string result(buffer.data(), read);
assert(result == "Hello, async world!");
}
}
Phase 3: Error Handling & RAII (Days 8-10)
Goals:
- Robust errno handling
- Proper destructor behavior
- Exception propagation through coroutines
Key Code:
// io_error.hpp
class IOError : public std::runtime_error {
public:
IOError(const std::string& message, int error_code)
: std::runtime_error(format_message(message, error_code))
, error_code_(error_code) {}
int error_code() const noexcept { return error_code_; }
// Convenience methods
bool is_not_found() const { return error_code_ == ENOENT; }
bool is_permission_denied() const { return error_code_ == EACCES; }
bool is_disk_full() const { return error_code_ == ENOSPC; }
private:
static std::string format_message(const std::string& op, int err) {
return op + ": " + std::strerror(err) + " (errno=" + std::to_string(err) + ")";
}
int error_code_;
};
// async_file.cpp - destructor handling
AsyncFile::~AsyncFile() {
if (fd_ >= 0 && !closed_) {
// Cannot co_await in destructor, so do sync close
// This is a known limitation - user should call close() explicitly
// for proper async cleanup
int result = ::close(fd_);
if (result < 0) {
// Log error but don't throw from destructor
std::cerr << "Warning: close() failed in destructor: "
<< std::strerror(errno) << std::endl;
}
}
}
Task<void> AsyncFile::close() {
if (!is_open()) {
co_return; // Already closed
}
int fd = fd_;
fd_ = -1;
closed_ = true;
co_await spawn_blocking(runtime_, [fd]() {
if (::close(fd) < 0) {
throw make_io_error("close");
}
});
}
Testing checkpoint:
Task<void> test_error_handling(Runtime& runtime) {
// Test file not found
try {
auto file = co_await AsyncFile::open(runtime, "/nonexistent/path/file.txt",
OpenMode::Read);
assert(false && "Should have thrown");
} catch (const IOError& e) {
assert(e.is_not_found());
}
// Test permission denied (if not root)
try {
auto file = co_await AsyncFile::open(runtime, "/etc/shadow",
OpenMode::Read);
// If we got here, we're root - skip test
} catch (const IOError& e) {
assert(e.is_permission_denied());
}
}
Phase 4: Full File Copy Implementation (Days 11-12)
Goals:
- Complete copy_file implementation
- Handle partial reads/writes
- Measure performance
Key Code:
// Complete file copy with proper partial write handling
Task<void> copy_file(Runtime& runtime,
std::string src,
std::string dst,
size_t buffer_size = 64 * 1024) {
auto in = co_await AsyncFile::open(runtime, src, OpenMode::Read);
auto out = co_await AsyncFile::open(runtime, dst,
OpenMode::Write | OpenMode::Create | OpenMode::Truncate);
std::vector<char> buffer(buffer_size);
size_t total_copied = 0;
auto start_time = std::chrono::steady_clock::now();
while (true) {
size_t bytes_read = co_await in.read(buffer);
if (bytes_read == 0) break; // EOF
// Handle partial writes
std::span<const char> remaining(buffer.data(), bytes_read);
while (!remaining.empty()) {
size_t bytes_written = co_await out.write(remaining);
remaining = remaining.subspan(bytes_written);
}
total_copied += bytes_read;
}
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
double throughput_mbps = (total_copied / 1024.0 / 1024.0) /
(duration.count() / 1000.0);
std::cout << "Copied " << total_copied << " bytes in "
<< duration.count() << "ms ("
<< throughput_mbps << " MB/s)\n";
co_await in.close();
co_await out.close();
}
Phase 5: Performance Optimizations (Days 13-14)
Goals:
- Implement readahead (start next read while processing current)
- Implement write buffering
- Benchmark and tune
Readahead Pattern:
// Advanced: Overlapped reads for better throughput
Task<void> copy_file_overlapped(Runtime& runtime,
std::string src,
std::string dst) {
auto in = co_await AsyncFile::open(runtime, src, OpenMode::Read);
auto out = co_await AsyncFile::open(runtime, dst,
OpenMode::Write | OpenMode::Create | OpenMode::Truncate);
constexpr size_t BUFFER_SIZE = 64 * 1024;
std::vector<char> buffer1(BUFFER_SIZE);
std::vector<char> buffer2(BUFFER_SIZE);
// Start first read
size_t bytes_in_buf1 = co_await in.read(buffer1);
while (bytes_in_buf1 > 0) {
// Start next read (will run concurrently with write)
Task<size_t> next_read = in.read(buffer2);
// Write current buffer
std::span<const char> remaining(buffer1.data(), bytes_in_buf1);
while (!remaining.empty()) {
size_t written = co_await out.write(remaining);
remaining = remaining.subspan(written);
}
// Wait for next read to complete
bytes_in_buf1 = co_await std::move(next_read);
// Swap buffers
std::swap(buffer1, buffer2);
}
}
Write Buffering Pattern:
// Buffered writer that batches small writes
class BufferedAsyncWriter {
public:
BufferedAsyncWriter(AsyncFile& file, size_t buffer_size = 64 * 1024)
: file_(file), buffer_(buffer_size), used_(0) {}
Task<void> write(std::span<const char> data) {
while (!data.empty()) {
size_t space = buffer_.size() - used_;
size_t to_copy = std::min(space, data.size());
std::memcpy(buffer_.data() + used_, data.data(), to_copy);
used_ += to_copy;
data = data.subspan(to_copy);
if (used_ == buffer_.size()) {
co_await flush();
}
}
}
Task<void> flush() {
if (used_ == 0) co_return;
std::span<const char> remaining(buffer_.data(), used_);
while (!remaining.empty()) {
size_t written = co_await file_.write(remaining);
remaining = remaining.subspan(written);
}
used_ = 0;
}
private:
AsyncFile& file_;
std::vector<char> buffer_;
size_t used_;
};
Testing Strategy
Test Categories
| Category | Purpose | Examples |
|---|---|---|
| Unit Tests | Test individual components | spawn_blocking returns correct value |
| Integration Tests | Test full async flow | File copy works end-to-end |
| Error Tests | Test error handling | File not found throws IOError |
| Stress Tests | Test under load | 1000 concurrent file operations |
| Performance Tests | Measure throughput | Compare to sync implementation |
Critical Test Cases
// Test 1: Basic spawn_blocking
TEST(SpawnBlocking, ReturnsValue) {
Runtime runtime(2, 4);
auto result = runtime.block_on(spawn_blocking(runtime, [] { return 42; }));
EXPECT_EQ(result, 42);
}
// Test 2: spawn_blocking exception propagation
TEST(SpawnBlocking, PropagatesExceptions) {
Runtime runtime(2, 4);
EXPECT_THROW({
runtime.block_on(spawn_blocking(runtime, [] {
throw std::runtime_error("test error");
return 0;
}));
}, std::runtime_error);
}
// Test 3: Basic file read
TEST(AsyncFile, ReadSmallFile) {
Runtime runtime(2, 4);
// Create test file
std::ofstream("/tmp/test_read.txt") << "Hello, World!";
auto task = [](Runtime& rt) -> Task<std::string> {
auto file = co_await AsyncFile::open(rt, "/tmp/test_read.txt", OpenMode::Read);
std::vector<char> buffer(100);
size_t bytes = co_await file.read(buffer);
co_return std::string(buffer.data(), bytes);
};
std::string content = runtime.block_on(task(runtime));
EXPECT_EQ(content, "Hello, World!");
}
// Test 4: Large file copy
TEST(AsyncFile, CopyLargeFile) {
Runtime runtime(2, 4);
// Create 10MB test file
{
std::ofstream out("/tmp/test_large.bin", std::ios::binary);
std::vector<char> data(1024 * 1024, 'X');
for (int i = 0; i < 10; ++i) {
out.write(data.data(), data.size());
}
}
runtime.block_on(copy_file(runtime, "/tmp/test_large.bin", "/tmp/test_copy.bin"));
// Verify sizes match
std::ifstream in1("/tmp/test_large.bin", std::ios::binary | std::ios::ate);
std::ifstream in2("/tmp/test_copy.bin", std::ios::binary | std::ios::ate);
EXPECT_EQ(in1.tellg(), in2.tellg());
}
// Test 5: Concurrent operations
TEST(AsyncFile, ConcurrentReads) {
Runtime runtime(2, 8);
// Create test files
for (int i = 0; i < 10; ++i) {
std::ofstream("/tmp/test_" + std::to_string(i) + ".txt")
<< "File " << i;
}
auto task = [](Runtime& rt) -> Task<void> {
std::vector<Task<std::string>> tasks;
for (int i = 0; i < 10; ++i) {
tasks.push_back([](Runtime& rt, int idx) -> Task<std::string> {
auto file = co_await AsyncFile::open(rt,
"/tmp/test_" + std::to_string(idx) + ".txt", OpenMode::Read);
std::vector<char> buffer(100);
size_t bytes = co_await file.read(buffer);
co_return std::string(buffer.data(), bytes);
}(rt, i));
}
for (int i = 0; i < 10; ++i) {
std::string content = co_await std::move(tasks[i]);
assert(content == "File " + std::to_string(i));
}
};
runtime.block_on(task(runtime));
}
// Test 6: Error handling
TEST(AsyncFile, FileNotFound) {
Runtime runtime(2, 4);
auto task = [](Runtime& rt) -> Task<void> {
try {
auto file = co_await AsyncFile::open(rt, "/nonexistent/file.txt", OpenMode::Read);
FAIL() << "Should have thrown";
} catch (const IOError& e) {
EXPECT_TRUE(e.is_not_found());
}
};
runtime.block_on(task(runtime));
}
// Test 7: EOF handling
TEST(AsyncFile, ReturnsZeroOnEOF) {
Runtime runtime(2, 4);
std::ofstream("/tmp/test_eof.txt") << "short";
auto task = [](Runtime& rt) -> Task<void> {
auto file = co_await AsyncFile::open(rt, "/tmp/test_eof.txt", OpenMode::Read);
std::vector<char> buffer(1024);
size_t bytes1 = co_await file.read(buffer);
EXPECT_EQ(bytes1, 5);
size_t bytes2 = co_await file.read(buffer);
EXPECT_EQ(bytes2, 0); // EOF
};
runtime.block_on(task(runtime));
}
Performance Benchmarks
void benchmark_file_copy(Runtime& runtime) {
// Create 1GB test file
const size_t FILE_SIZE = 1024 * 1024 * 1024;
{
std::ofstream out("/tmp/bench_src.bin", std::ios::binary);
std::vector<char> chunk(1024 * 1024, 'X');
for (size_t written = 0; written < FILE_SIZE; written += chunk.size()) {
out.write(chunk.data(), chunk.size());
}
}
// Benchmark sync copy
auto sync_start = std::chrono::steady_clock::now();
{
std::ifstream in("/tmp/bench_src.bin", std::ios::binary);
std::ofstream out("/tmp/bench_sync.bin", std::ios::binary);
std::vector<char> buffer(64 * 1024);
while (in.read(buffer.data(), buffer.size()) || in.gcount() > 0) {
out.write(buffer.data(), in.gcount());
}
}
auto sync_duration = std::chrono::steady_clock::now() - sync_start;
// Benchmark async copy
auto async_start = std::chrono::steady_clock::now();
runtime.block_on(copy_file(runtime, "/tmp/bench_src.bin", "/tmp/bench_async.bin"));
auto async_duration = std::chrono::steady_clock::now() - async_start;
// Benchmark overlapped copy
auto overlap_start = std::chrono::steady_clock::now();
runtime.block_on(copy_file_overlapped(runtime, "/tmp/bench_src.bin", "/tmp/bench_overlap.bin"));
auto overlap_duration = std::chrono::steady_clock::now() - overlap_start;
std::cout << "Sync: " << std::chrono::duration_cast<std::chrono::milliseconds>(sync_duration).count() << "ms\n";
std::cout << "Async: " << std::chrono::duration_cast<std::chrono::milliseconds>(async_duration).count() << "ms\n";
std::cout << "Overlapped: " << std::chrono::duration_cast<std::chrono::milliseconds>(overlap_duration).count() << "ms\n";
}
Common Pitfalls & Debugging
Frequent Mistakes
| Pitfall | Symptom | Solution |
|---|---|---|
Capturing this by reference |
Use-after-free crash | Capture fd_ by value, not this |
| Forgetting partial writes | Data corruption | Loop until all bytes written |
| Throwing from destructor | Program termination | Log errors, don’t throw |
| Using file after close | EBADF errors | Track closed state, check before operations |
| Thread pool too small | Poor concurrency | Size I/O pool based on expected concurrency |
| Capturing span by value | Dangling reference | Ensure buffer outlives the operation |
Debugging Strategies
1. Trace spawn_blocking flow:
template<typename F>
auto spawn_blocking(Runtime& runtime, F&& func) -> Task<std::invoke_result_t<F>> {
std::cerr << "[spawn_blocking] Queueing work on thread "
<< std::this_thread::get_id() << "\n";
// ... implementation ...
std::cerr << "[spawn_blocking] Resuming on thread "
<< std::this_thread::get_id() << "\n";
}
2. Track file descriptor lifecycle:
AsyncFile::AsyncFile(int fd, Runtime& runtime)
: fd_(fd), runtime_(runtime) {
std::cerr << "[AsyncFile] Opened fd=" << fd_ << "\n";
}
AsyncFile::~AsyncFile() {
std::cerr << "[AsyncFile] Destructor fd=" << fd_
<< " closed=" << closed_ << "\n";
}
3. Verify thread usage:
Task<size_t> AsyncFile::read(std::span<char> buffer) {
auto caller_thread = std::this_thread::get_id();
ssize_t result = co_await spawn_blocking(runtime_, [=, &caller_thread]() {
assert(std::this_thread::get_id() != caller_thread
&& "I/O should happen on different thread!");
return ::read(fd_, buffer.data(), buffer.size());
});
// Should be back on async thread (or original)
co_return static_cast<size_t>(result);
}
4. Use AddressSanitizer for memory issues:
cmake -DCMAKE_CXX_FLAGS="-fsanitize=address -g" ..
5. Use ThreadSanitizer for race conditions:
cmake -DCMAKE_CXX_FLAGS="-fsanitize=thread -g" ..
Performance Issues
| Issue | Symptom | Fix |
|---|---|---|
| Too many small I/Os | High CPU, low throughput | Use larger buffers, buffered writer |
| I/O pool contention | High latency variance | Increase pool size |
| Thread creation overhead | Slow startup | Pre-create thread pool |
| Memory allocation in hot path | Poor cache performance | Reuse buffers |
| Frequent coroutine suspensions | Overhead | Batch operations |
Extensions & Challenges
Extension 1: io_uring Integration (Linux 5.1+)
Replace the thread pool with true kernel async I/O:
class IOURingRuntime {
public:
IOURingRuntime(unsigned entries = 256);
// Submit read operation
Task<size_t> async_read(int fd, std::span<char> buffer, off_t offset);
// Submit write operation
Task<size_t> async_write(int fd, std::span<const char> buffer, off_t offset);
// Process completions
void poll();
private:
struct io_uring ring_;
std::unordered_map<uint64_t, std::coroutine_handle<>> pending_;
};
Key concepts:
- Submission Queue (SQ): User submits I/O requests
- Completion Queue (CQ): Kernel posts completions
- No thread pool needed - kernel handles async
- Can do zero-copy with registered buffers
Extension 2: Memory-Mapped File Support
class AsyncMappedFile {
public:
static Task<AsyncMappedFile> open(Runtime& runtime,
std::string path,
size_t size = 0);
// Get a span to mapped memory (sync access, no copy)
std::span<char> data();
// Async sync to disk
Task<void> sync(bool async = true);
private:
void* mapping_;
size_t size_;
int fd_;
};
Extension 3: Directory Operations
Task<std::vector<std::string>> readdir(Runtime& runtime, std::string path);
Task<void> mkdir(Runtime& runtime, std::string path, mode_t mode = 0755);
Task<void> remove(Runtime& runtime, std::string path);
Task<void> rename(Runtime& runtime, std::string from, std::string to);
Extension 4: Watching for File Changes
class FileWatcher {
public:
Task<void> watch(Runtime& runtime, std::string path);
// Returns when file changes
Task<FileEvent> next_event();
private:
int inotify_fd_;
Runtime& runtime_;
};
Extension 5: Async Network I/O
Extend the library to support sockets using epoll/kqueue:
class AsyncSocket {
public:
static Task<AsyncSocket> connect(Runtime& runtime,
std::string host,
uint16_t port);
Task<size_t> read(std::span<char> buffer);
Task<size_t> write(std::span<const char> buffer);
Task<void> close();
};
This would use the event loop pattern rather than thread pool offloading.
Resources
Essential Reading
- “The Linux Programming Interface” by Michael Kerrisk
- Chapter 63: Alternative I/O Models
- Chapter 5: File I/O (Deep Dive)
- Chapter 4: File I/O (Universal Model)
- “C++ Concurrency in Action” by Anthony Williams
- Chapter 4: Synchronizing concurrent operations
- Chapter 9: Advanced thread management
Documentation
Video Resources
- “C++20 Coroutines” - CppCon talks by various speakers
- “io_uring: The Lord of the Rings” - Jonathan Corbet
- “Asio: Asynchronous I/O” - Christopher Kohlhoff
Related Projects
- cppcoro: Lewis Baker’s coroutine library
- liburing: io_uring wrapper library
- libuv: Node.js async I/O library
- ASIO: Boost/standalone async I/O
Books That Will Help
| Topic | Book | Chapter |
|---|---|---|
| POSIX File I/O | The Linux Programming Interface | Ch. 4-5, 63 |
| Async patterns | C++ Concurrency in Action | Ch. 4, 9 |
| Thread pools | The Art of Multiprocessor Programming | Ch. 16 |
| Coroutines | (Online resources - see cppreference) | N/A |
| io_uring | (Kernel documentation + blog posts) | N/A |
| Systems programming | Computer Systems: A Programmer’s Perspective | Ch. 10 |
Self-Assessment Checklist
Before considering this project complete, verify:
Understanding
- I can explain why traditional file I/O is blocking and how spawn_blocking makes it “async”
- I understand the difference between true async I/O (io_uring) and thread-pool-based async
- I can describe how the coroutine suspension/resumption works for spawn_blocking
- I understand why we need separate thread pools for async coroutines vs blocking I/O
- I can explain the performance tradeoffs of buffer sizes
- I know when async file I/O provides benefits over sync I/O
Implementation
- spawn_blocking correctly offloads work and returns results
- AsyncFile::open handles all mode combinations
- read() returns correct byte counts including 0 for EOF
- write() handles partial writes correctly
- Errors propagate as exceptions through coroutine suspension
- File descriptors are properly closed (RAII + explicit close)
- No memory leaks or use-after-free issues
Testing
- Unit tests pass for all components
- Integration tests cover full file copy workflow
- Error handling tests verify exception behavior
- Performance benchmarks show expected characteristics
- Stress tests with concurrent operations pass
- Memory sanitizer and thread sanitizer report no issues
Growth
- I’ve benchmarked my implementation against sync I/O
- I understand how production runtimes (Tokio, ASIO) structure their I/O
- I can identify bottlenecks in async I/O code
- I’m ready to explore io_uring for true async I/O
The Interview Questions They’ll Ask
After completing this project, you’ll be ready for these questions:
- “How do async file operations work under the hood?”
- They want: Traditional file I/O is blocking; async is achieved by offloading to a thread pool that performs blocking calls, then resuming the waiting coroutine when complete. io_uring provides true kernel async.
- “What’s the difference between io_uring and thread-pool-based async I/O?”
- They want: Thread pool has context switch overhead, requires threads for each concurrent operation. io_uring uses kernel submission/completion queues, zero-copy possible, fewer syscalls.
- “How do you handle errors in async I/O?”
- They want: Map errno to exceptions, propagate through coroutine suspension points, ensure resource cleanup happens regardless of error path.
- “Why can’t you use O_NONBLOCK for file I/O?”
- They want: O_NONBLOCK only affects pipes and sockets. Regular file reads always block because the data must come from disk.
- “How would you implement a high-performance file copy?”
- They want: Overlapping reads and writes using double buffering, appropriate buffer sizes (typically 64KB-1MB), possibly using sendfile() or splice() for kernel-level optimization.
- “What’s the relationship between spawn_blocking and the async executor?”
- They want: spawn_blocking queues work to an I/O thread pool (different from async executor threads), returns a Task that suspends the calling coroutine until the blocking work completes.
Submission / Completion Criteria
Minimum Viable Completion:
- spawn_blocking works correctly
- AsyncFile can open, read, write, and close
- Basic file copy example runs successfully
- Error handling works for common cases
Full Completion:
- All AsyncFile operations implemented
- Robust error handling with IOError class
- Performance benchmarks show reasonable throughput
- Unit tests for all components pass
- Documentation explains design decisions
Excellence (Going Above & Beyond):
- Overlapped read/write for improved throughput
- Buffered writer for small writes
- io_uring integration (Linux 5.1+)
- Memory-mapped file support
- Directory operation support
- Comprehensive benchmark suite
This guide was expanded from LEARN_CPP_CONCURRENCY_AND_PARALLELISM.md. For the complete learning path, see the project index.