Project 11: Async File I/O Library

Build an async file I/O library where read/write operations are non-blocking coroutines, backed by a thread pool that simulates production async I/O patterns.


Quick Reference

Attribute Value
Language C++20 (coroutines required)
Difficulty Expert
Time 2-3 weeks
Prerequisites Project 3 (Thread Pool), Project 10 (Async Task Framework)
Coolness Level 4: Hardcore Tech Flex
Portfolio Value Service & Support Model

Learning Objectives

By completing this project, you will be able to:

  1. Implement spawn_blocking for thread offloading: Create a mechanism that queues synchronous work to a thread pool and returns a Task that completes when the work finishes
  2. Design AsyncFile with coroutine-based read/write: Build file handles whose operations return Task<size_t> that can be co_awaited without blocking the async runtime
  3. Bridge synchronous and asynchronous APIs: Understand why traditional file I/O is blocking and how to wrap it in non-blocking coroutine interfaces
  4. Handle errno-based errors in async contexts: Map C-style errno errors to C++ exceptions that propagate correctly through coroutine suspension points
  5. Implement proper RAII for async file handles: Ensure file descriptors are correctly managed across async lifetimes and coroutine frames
  6. Understand the architecture of production async runtimes: See how Tokio, ASIO, and libuv actually structure their I/O layers
  7. Optimize with readahead and write buffering: Implement performance optimizations that overlap I/O with computation
  8. Prepare for io_uring integration: Understand the limitations of thread-pool-based async I/O and how true kernel async I/O eliminates them
  9. Profile and benchmark async I/O performance: Measure throughput, latency, and thread utilization to verify your implementation
  10. Debug async I/O issues: Trace through coroutine suspension to find bugs in async file operations

Theoretical Foundation

Core Concepts

Why File I/O Needs Special Handling

Traditional file I/O in POSIX systems is synchronous and blocking. When you call read() or write(), the calling thread is suspended until the kernel completes the operation:

Traditional Blocking I/O:

Thread               Kernel               Disk
  |                    |                   |
  +-- read(fd) ------->|                   |
  |   [BLOCKED]        +-- disk read ----->|
  |   [BLOCKED]        |<-- data ----------+
  |<-- return data ----+                   |
  |                    |                   |
  [Thread was idle for entire disk access time]

This is problematic for async programs because:

  1. Thread starvation: Blocking threads can’t run other coroutines
  2. Wasted resources: Threads consume memory even when waiting
  3. Poor scalability: Need one thread per concurrent I/O operation

The Async I/O Solution

Async runtimes solve this by separating the “request” from the “completion”:

Async I/O with Thread Pool:

Coroutine            Thread Pool           Kernel           Disk
    |                    |                   |                |
 co_await read()         |                   |                |
    |                    |                   |                |
 [suspend] -----------> queue work          |                |
    |                   Worker picks up     |                |
    |                    +-- read(fd) ----->|                |
    |                    |   [blocked]       +-- disk ------>|
    |                    |   [blocked]       |<-- data ------+
    |                    |<-- return --------+                |
    |<----- resume ------+                   |                |
    |                    |                   |                |
[Coroutine was suspended, runtime ran other tasks]

Three Approaches to Async File I/O

1. Thread Pool Offloading (This Project)

  • Queue blocking I/O to dedicated threads
  • Simple to implement, works everywhere
  • Overhead: context switches, thread pool management
  • Used by: Node.js (libuv), early Tokio

2. io_uring (Linux 5.1+)

  • Kernel-based async I/O with shared ring buffers
  • Zero-copy, minimal syscalls
  • Highest performance for file I/O
  • Used by: modern Tokio, Glommio, io_uring crate

3. Windows IOCP (Completion Ports)

  • Native Windows async I/O
  • Similar concept to io_uring
  • Used by: ASIO on Windows, Tokio on Windows
Performance Comparison:

                Thread Pool      io_uring       IOCP
                    |                |            |
Latency:         ~10-50μs        ~1-5μs        ~1-5μs
Throughput:       Good          Excellent     Excellent
Syscalls/op:        2              0-1           1
Context switches:   2              0             1
Platform:         Any           Linux 5.1+    Windows
Complexity:       Low            High          High

Blocking vs Non-Blocking: The Fundamental Distinction

Blocking call: Thread is suspended, OS scheduler removes it from ready queue

// Thread 1 is stuck here until disk responds
ssize_t bytes = read(fd, buffer, size);  // BLOCKS

Non-blocking call: Returns immediately, check later for completion

// Returns immediately, but might not have data
ssize_t bytes = read(fd, buffer, size);  // Returns -1 with EAGAIN

Async/await abstraction: Looks blocking, but coroutine suspends while thread runs other work

// Syntactically synchronous, but non-blocking to the runtime
size_t bytes = co_await file.read(buffer);  // Suspends coroutine, not thread

Why This Matters

Understanding async file I/O is crucial because:

  1. Real-world applications are I/O bound: Most server applications spend time waiting for disk/network, not computing
  2. Thread pools have costs: Understanding the overhead helps you make architectural decisions
  3. Production runtimes use these patterns: Tokio, ASIO, libuv all work this way
  4. io_uring is the future: But understanding thread-based async helps you appreciate io_uring’s improvements
  5. Debugging async I/O is hard: Knowing the architecture helps you trace problems

Historical Context

The evolution of async I/O in systems programming:

1990s: select/poll

  • Wait on multiple file descriptors
  • Limited to network sockets, not files
  • O(n) per call, poor scalability

2000s: epoll/kqueue

  • O(1) event notification
  • Still network-focused
  • libevent, libev emerge

2010s: libuv/ASIO

  • Thread pool for file I/O
  • Event loop for network I/O
  • Node.js popularizes the pattern

2019+: io_uring

  • True async for everything
  • Shared memory ring buffers
  • Near-zero syscall overhead
Timeline of Async I/O Patterns:

1983  select()     - BSD 4.2
1995  poll()       - POSIX
2002  epoll()      - Linux 2.5.44
2000  kqueue()     - FreeBSD 4.1
2002  IOCP         - Windows 2000
2009  libuv        - Node.js project
2011  ASIO         - Boost.Asio, later standalone
2019  io_uring     - Linux 5.1
2020  C++20        - Coroutines standardized

Common Misconceptions

Misconception 1: “Async file I/O doesn’t need threads” Reality: On most systems, file I/O has no true async API. The “async” is achieved by moving blocking calls to worker threads. Only io_uring and IOCP provide kernel-level async file I/O.

Misconception 2: “O_NONBLOCK makes file I/O async” Reality: O_NONBLOCK only works for network sockets and pipes. For regular files, read() and write() always block, regardless of this flag.

Misconception 3: “Async I/O is always faster” Reality: For single sequential reads, sync I/O is faster (no thread pool overhead). Async I/O shines when:

  • You have many concurrent operations
  • You can overlap I/O with computation
  • You need to maintain responsiveness

Misconception 4: “The thread pool size should match CPU cores” Reality: I/O thread pools should be sized based on expected I/O concurrency, not CPU cores. A 4-core machine might need 32+ I/O threads if doing many parallel file operations.


Project Specification

What You Will Build

An async file I/O library that provides:

  1. AsyncFile class with co_await-able read() and write() methods
  2. spawn_blocking() function to offload any synchronous work to a thread pool
  3. Integration with the Runtime from Project 10
  4. Proper error handling with errno-to-exception mapping
  5. RAII-compliant file handle management

API Design

// Open modes
enum class OpenMode {
    Read       = 0x01,
    Write      = 0x02,
    Create     = 0x04,
    Truncate   = 0x08,
    Append     = 0x10
};

// Allow bitwise OR
constexpr OpenMode operator|(OpenMode a, OpenMode b);

// The async file handle
class AsyncFile {
public:
    // Returns Task that yields AsyncFile when file is opened
    static Task<AsyncFile> open(Runtime& runtime,
                                std::string path,
                                OpenMode mode);

    // Read into buffer, returns bytes read (0 = EOF)
    Task<size_t> read(std::span<char> buffer);

    // Write from buffer, returns bytes written
    Task<size_t> write(std::span<const char> buffer);

    // Seek to position
    Task<off_t> seek(off_t offset, int whence = SEEK_SET);

    // Get current file size
    Task<size_t> size();

    // Explicit close (also called by destructor)
    Task<void> close();

    // File descriptor access (for advanced use)
    int fd() const noexcept;

private:
    int fd_;
    Runtime& runtime_;
    bool closed_ = false;
};

// The spawn_blocking primitive
template<typename F>
Task<std::invoke_result_t<F>> spawn_blocking(Runtime& runtime, F&& func);

Functional Requirements

  1. File Opening
    • Open files with various mode combinations
    • Handle file not found, permission denied, etc.
    • Create files when OpenMode::Create is specified
  2. Reading
    • Read up to N bytes into a buffer
    • Return actual bytes read (may be less than requested)
    • Return 0 on EOF
    • Handle partial reads correctly
  3. Writing
    • Write from buffer to file
    • Handle partial writes correctly
    • Support append mode
  4. Error Handling
    • Convert errno to meaningful exceptions
    • Propagate exceptions through coroutine suspension
    • Clean up resources on error
  5. Resource Management
    • Close file descriptors when AsyncFile is destroyed
    • Handle close() being called multiple times
    • Prevent use-after-close

Real World Outcome

When complete, you can write code like this:

Task<void> copy_file(Runtime& runtime, std::string src, std::string dst) {
    auto in = co_await AsyncFile::open(runtime, src, OpenMode::Read);
    auto out = co_await AsyncFile::open(runtime, dst,
                                        OpenMode::Write | OpenMode::Create | OpenMode::Truncate);

    std::vector<char> buffer(64 * 1024);  // 64KB buffer

    while (true) {
        size_t bytes_read = co_await in.read(buffer);
        if (bytes_read == 0) break;  // EOF

        std::span<const char> data(buffer.data(), bytes_read);
        while (!data.empty()) {
            size_t bytes_written = co_await out.write(data);
            data = data.subspan(bytes_written);
        }
    }

    std::cout << "Copy complete!\n";
}

Task<void> process_large_file(Runtime& runtime, std::string path) {
    auto file = co_await AsyncFile::open(runtime, path, OpenMode::Read);

    std::vector<char> buffer(1024 * 1024);  // 1MB buffer
    size_t total = 0;
    size_t line_count = 0;

    while (true) {
        size_t bytes = co_await file.read(buffer);
        if (bytes == 0) break;

        total += bytes;

        // Count lines in this chunk
        for (size_t i = 0; i < bytes; ++i) {
            if (buffer[i] == '\n') ++line_count;
        }
    }

    std::cout << "Processed " << total << " bytes, "
              << line_count << " lines\n";
}

int main() {
    Runtime runtime(4);  // 4 I/O threads

    runtime.block_on(copy_file(runtime, "large.bin", "copy.bin"));
    runtime.block_on(process_large_file(runtime, "data.txt"));

    return 0;
}

Expected output:

$ ./async_file_demo

[Runtime] Starting with 4 worker threads
[Worker-1] Starting copy_file
[Worker-2] async_open: opening 'large.bin'
[Worker-2] async_open: success, fd=3
[Worker-3] async_open: opening 'copy.bin' (create)
[Worker-3] async_open: success, fd=4
[Worker-1] async_read: queued for fd=3, size=65536
[Worker-2] async_read: completed 65536 bytes
[Worker-1] async_write: queued for fd=4, size=65536
[Worker-3] async_write: completed 65536 bytes
... (continues, interleaving reads and writes)
[Worker-1] async_read: completed 0 bytes (EOF)
Copy complete!

Copying 10GB file...
  Elapsed: 45 seconds
  Throughput: 227 MB/s
  I/O operations: 163,840 reads, 163,840 writes

Comparison with sync implementation:
  Synchronous copy: 52 seconds
  Async with overlap: 45 seconds (15% faster due to I/O overlap)

[Worker-1] Starting process_large_file
[Worker-2] async_open: opening 'data.txt'
[Worker-2] async_open: success, fd=5
... (processing)
Processed 1073741824 bytes, 15000000 lines

[Runtime] Shutting down
[Runtime] All tasks complete

Solution Architecture

High-Level Design

┌─────────────────────────────────────────────────────────────────────────────┐
│                           User Coroutine Layer                               │
│                                                                              │
│    Task<void> copy_file() {                                                  │
│        auto f = co_await AsyncFile::open(...);                               │
│        auto bytes = co_await f.read(buffer);     ◄── Suspends here           │
│        co_await f.write(data);                   ◄── And here                │
│    }                                                                         │
└────────────────────────────────────────┬────────────────────────────────────┘
                                         │
                                         │ Task<T>
                                         ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                            AsyncFile Class                                   │
│                                                                              │
│    ┌────────────────────────────────────────────────────────────────────┐   │
│    │  Task<size_t> read(buffer) {                                       │   │
│    │      return spawn_blocking([=] {                                   │   │
│    │          return ::read(fd_, buffer.data(), buffer.size());         │   │
│    │      });                                                           │   │
│    │  }                                                                 │   │
│    └────────────────────────────────────────────────────────────────────┘   │
│                                                                              │
└────────────────────────────────────────┬────────────────────────────────────┘
                                         │
                                         │ spawn_blocking()
                                         ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                          Runtime (from Project 10)                           │
│                                                                              │
│    ┌──────────────────┐      ┌──────────────────────────────────────────┐   │
│    │  Async Executor  │      │           I/O Thread Pool                 │   │
│    │                  │      │                                           │   │
│    │  Coroutine Queue │◄────►│   ┌─────────┐ ┌─────────┐ ┌─────────┐    │   │
│    │                  │      │   │ Worker1 │ │ Worker2 │ │ Worker3 │    │   │
│    │  Ready tasks get │      │   │ blocked │ │  idle   │ │ blocked │    │   │
│    │  resumed here    │      │   │ on I/O  │ │         │ │ on I/O  │    │   │
│    └──────────────────┘      │   └─────────┘ └─────────┘ └─────────┘    │   │
│                              └──────────────────────────────────────────┘   │
│                                                                              │
└────────────────────────────────────────┬────────────────────────────────────┘
                                         │
                                         │ Blocking syscalls
                                         ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                            Operating System                                  │
│                                                                              │
│    ┌──────────────────────────────────────────────────────────────────────┐ │
│    │  Kernel                                                              │ │
│    │                                                                      │ │
│    │    read(fd, buf, n)  ──────►  Disk Controller  ──────►  Disk        │ │
│    │                                                                      │ │
│    │    Thread is blocked until disk responds                            │ │
│    └──────────────────────────────────────────────────────────────────────┘ │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

The spawn_blocking Flow

User Coroutine                     Runtime                        I/O Worker Thread
       │                             │                                   │
       │ co_await spawn_blocking(f)  │                                   │
       │                             │                                   │
       ├────────► queue(f) ─────────►│                                   │
       │                             │                                   │
   [suspend]                         │ ◄──── worker dequeues task ───────┤
       │                             │                                   │
       │                             │       execute f()                 │
       │                             │           │                       │
       │                             │       [blocking I/O]              │
       │                             │           │                       │
       │                             │       result = f()                │
       │                             │           │                       │
       │ ◄────── resume with result ─┼───────────┘                       │
       │                             │                                   │
   [continue]                        │                                   │
       │                             │                                   │

Key Components

Component Responsibility Key Design Decisions
AsyncFile File handle with async operations Owns fd, holds Runtime reference, RAII close
spawn_blocking() Offload sync work to thread pool Returns Task, bridges sync/async
BlockingTask Awaitable for spawn_blocking result Wraps promise/future for thread communication
Runtime Manages async executor + I/O pool Separate pools for async tasks vs blocking I/O
IOError Exception for file operation failures Maps errno to descriptive messages

Data Structures

// Error handling
class IOError : public std::runtime_error {
public:
    IOError(const std::string& operation, int error_code);

    int error_code() const noexcept { return error_code_; }
    const char* operation() const noexcept { return operation_.c_str(); }

private:
    std::string operation_;
    int error_code_;
};

// The blocking task awaitable
template<typename T>
class BlockingTask {
public:
    BlockingTask(Runtime& runtime, std::function<T()> work);

    bool await_ready() const noexcept { return completed_.load(); }

    void await_suspend(std::coroutine_handle<> continuation) {
        continuation_ = continuation;
        runtime_.submit_blocking([this] {
            try {
                if constexpr (std::is_void_v<T>) {
                    work_();
                } else {
                    result_.emplace(work_());
                }
            } catch (...) {
                exception_ = std::current_exception();
            }
            completed_.store(true);
            runtime_.schedule(continuation_);
        });
    }

    T await_resume() {
        if (exception_) {
            std::rethrow_exception(exception_);
        }
        if constexpr (!std::is_void_v<T>) {
            return std::move(*result_);
        }
    }

private:
    Runtime& runtime_;
    std::function<T()> work_;
    std::coroutine_handle<> continuation_;
    std::atomic<bool> completed_{false};
    std::optional<T> result_;
    std::exception_ptr exception_;
};

// File metadata for debugging/stats
struct FileStats {
    size_t total_reads = 0;
    size_t total_writes = 0;
    size_t bytes_read = 0;
    size_t bytes_written = 0;
    std::chrono::steady_clock::time_point opened_at;
};

// The async file class
class AsyncFile {
public:
    static Task<AsyncFile> open(Runtime& runtime,
                                std::string path,
                                OpenMode mode);

    AsyncFile(AsyncFile&& other) noexcept;
    AsyncFile& operator=(AsyncFile&& other) noexcept;
    ~AsyncFile();

    // Delete copy operations (file handles are unique)
    AsyncFile(const AsyncFile&) = delete;
    AsyncFile& operator=(const AsyncFile&) = delete;

    Task<size_t> read(std::span<char> buffer);
    Task<size_t> write(std::span<const char> buffer);
    Task<off_t> seek(off_t offset, int whence = SEEK_SET);
    Task<size_t> size();
    Task<void> close();

    int fd() const noexcept { return fd_; }
    bool is_open() const noexcept { return fd_ >= 0 && !closed_; }
    const FileStats& stats() const noexcept { return stats_; }

private:
    AsyncFile(int fd, Runtime& runtime);

    int fd_;
    Runtime& runtime_;
    bool closed_ = false;
    FileStats stats_;
};

Error Handling Strategy

// errno to exception mapping
IOError make_io_error(const std::string& operation) {
    int err = errno;
    std::string message = operation + ": " + std::strerror(err);
    return IOError(message, err);
}

// Common error codes and their meanings
switch (errno) {
    case ENOENT:  // File not found
    case EACCES:  // Permission denied
    case EEXIST:  // File exists (when O_EXCL)
    case ENOSPC:  // No space left on device
    case EDQUOT:  // Disk quota exceeded
    case EIO:     // I/O error (hardware failure)
    case EINTR:   // Interrupted (should retry)
    case EAGAIN:  // Would block (for non-blocking, shouldn't happen for files)
}

Implementation Guide

Development Environment Setup

# Requirements: C++20 compiler with coroutine support
# GCC 10+ or Clang 14+ recommended

# Check compiler version
g++ --version  # Need GCC 10+
# or
clang++ --version  # Need Clang 14+

# Create project structure
mkdir -p async-file-io/{src,include,tests,examples}
cd async-file-io

# Create CMakeLists.txt
cat > CMakeLists.txt << 'EOF'
cmake_minimum_required(VERSION 3.16)
project(async_file_io CXX)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

# Coroutine support
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
    add_compile_options(-fcoroutines)
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
    add_compile_options(-fcoroutines-ts)
endif()

# Thread support
find_package(Threads REQUIRED)

add_library(async_file_io
    src/runtime.cpp
    src/async_file.cpp
    src/io_error.cpp
)
target_include_directories(async_file_io PUBLIC include)
target_link_libraries(async_file_io Threads::Threads)

# Examples
add_executable(file_copy examples/file_copy.cpp)
target_link_libraries(file_copy async_file_io)

add_executable(file_process examples/file_process.cpp)
target_link_libraries(file_process async_file_io)

# Tests
enable_testing()
add_executable(test_async_file tests/test_async_file.cpp)
target_link_libraries(test_async_file async_file_io)
add_test(NAME async_file_tests COMMAND test_async_file)
EOF

Project Structure

async-file-io/
├── include/
│   ├── async_file.hpp       # AsyncFile class
│   ├── runtime.hpp          # Runtime with I/O pool (from Project 10, extended)
│   ├── task.hpp             # Task<T> coroutine type (from Project 10)
│   ├── io_error.hpp         # IOError exception class
│   └── spawn_blocking.hpp   # spawn_blocking implementation
├── src/
│   ├── async_file.cpp       # AsyncFile implementation
│   ├── runtime.cpp          # Runtime implementation
│   └── io_error.cpp         # IOError implementation
├── examples/
│   ├── file_copy.cpp        # Copy file demo
│   ├── file_process.cpp     # Process large file demo
│   └── concurrent_io.cpp    # Multiple concurrent file operations
├── tests/
│   ├── test_async_file.cpp  # Unit tests
│   ├── test_spawn_blocking.cpp
│   └── test_error_handling.cpp
├── CMakeLists.txt
└── README.md

Implementation Phases

Phase 1: spawn_blocking Foundation (Days 1-3)

Goals:

  • Extend Runtime with an I/O thread pool
  • Implement spawn_blocking primitive
  • Test basic offloading

Key Code:

// spawn_blocking.hpp
template<typename F>
auto spawn_blocking(Runtime& runtime, F&& func)
    -> Task<std::invoke_result_t<F>>
{
    using ReturnType = std::invoke_result_t<F>;

    // Create a promise/future pair for cross-thread communication
    auto promise = std::make_shared<std::promise<ReturnType>>();
    auto future = promise->get_future();

    // Submit work to I/O thread pool
    runtime.submit_to_io_pool([func = std::forward<F>(func), promise]() mutable {
        try {
            if constexpr (std::is_void_v<ReturnType>) {
                func();
                promise->set_value();
            } else {
                promise->set_value(func());
            }
        } catch (...) {
            promise->set_exception(std::current_exception());
        }
    });

    // Return a Task that awaits the future
    co_return co_await make_task_from_future(runtime, std::move(future));
}

// In runtime.hpp - add I/O pool
class Runtime {
public:
    explicit Runtime(size_t async_threads = 0, size_t io_threads = 4);

    // Submit to async executor (for coroutines)
    void schedule(std::coroutine_handle<> handle);

    // Submit to I/O thread pool (for blocking work)
    template<typename F>
    void submit_to_io_pool(F&& func);

    // Run until task completes
    template<typename T>
    T block_on(Task<T> task);

private:
    // Async coroutine executor
    std::queue<std::coroutine_handle<>> ready_queue_;
    std::mutex queue_mutex_;
    std::condition_variable queue_cv_;
    std::vector<std::thread> async_workers_;

    // Blocking I/O thread pool
    std::queue<std::function<void()>> io_queue_;
    std::mutex io_mutex_;
    std::condition_variable io_cv_;
    std::vector<std::thread> io_workers_;

    std::atomic<bool> shutdown_{false};
};

Testing checkpoint:

Task<int> test_spawn_blocking(Runtime& runtime) {
    int result = co_await spawn_blocking(runtime, [] {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return 42;
    });
    assert(result == 42);
    co_return result;
}

Phase 2: Basic AsyncFile (Days 4-7)

Goals:

  • Implement AsyncFile::open()
  • Implement read() and write()
  • Handle basic error cases

Key Code:

// async_file.cpp
Task<AsyncFile> AsyncFile::open(Runtime& runtime,
                                std::string path,
                                OpenMode mode) {
    int flags = 0;

    // Convert OpenMode to POSIX flags
    if ((mode & OpenMode::Read) && (mode & OpenMode::Write)) {
        flags = O_RDWR;
    } else if (mode & OpenMode::Write) {
        flags = O_WRONLY;
    } else {
        flags = O_RDONLY;
    }

    if (mode & OpenMode::Create)   flags |= O_CREAT;
    if (mode & OpenMode::Truncate) flags |= O_TRUNC;
    if (mode & OpenMode::Append)   flags |= O_APPEND;

    int fd = co_await spawn_blocking(runtime, [path, flags]() {
        int fd = ::open(path.c_str(), flags, 0644);
        if (fd < 0) {
            throw make_io_error("open(" + path + ")");
        }
        return fd;
    });

    co_return AsyncFile(fd, runtime);
}

Task<size_t> AsyncFile::read(std::span<char> buffer) {
    if (!is_open()) {
        throw IOError("read on closed file", EBADF);
    }

    int fd = fd_;  // Capture by value

    ssize_t result = co_await spawn_blocking(runtime_, [fd, buffer]() {
        ssize_t bytes = ::read(fd, buffer.data(), buffer.size());
        if (bytes < 0) {
            throw make_io_error("read");
        }
        return bytes;
    });

    stats_.total_reads++;
    stats_.bytes_read += result;

    co_return static_cast<size_t>(result);
}

Task<size_t> AsyncFile::write(std::span<const char> buffer) {
    if (!is_open()) {
        throw IOError("write on closed file", EBADF);
    }

    int fd = fd_;

    ssize_t result = co_await spawn_blocking(runtime_, [fd, buffer]() {
        ssize_t bytes = ::write(fd, buffer.data(), buffer.size());
        if (bytes < 0) {
            throw make_io_error("write");
        }
        return bytes;
    });

    stats_.total_writes++;
    stats_.bytes_written += result;

    co_return static_cast<size_t>(result);
}

Testing checkpoint:

Task<void> test_read_write(Runtime& runtime) {
    // Write test
    {
        auto file = co_await AsyncFile::open(runtime, "/tmp/test.txt",
            OpenMode::Write | OpenMode::Create | OpenMode::Truncate);
        std::string data = "Hello, async world!";
        size_t written = co_await file.write(std::span(data));
        assert(written == data.size());
    }

    // Read test
    {
        auto file = co_await AsyncFile::open(runtime, "/tmp/test.txt",
            OpenMode::Read);
        std::vector<char> buffer(100);
        size_t read = co_await file.read(buffer);
        std::string result(buffer.data(), read);
        assert(result == "Hello, async world!");
    }
}

Phase 3: Error Handling & RAII (Days 8-10)

Goals:

  • Robust errno handling
  • Proper destructor behavior
  • Exception propagation through coroutines

Key Code:

// io_error.hpp
class IOError : public std::runtime_error {
public:
    IOError(const std::string& message, int error_code)
        : std::runtime_error(format_message(message, error_code))
        , error_code_(error_code) {}

    int error_code() const noexcept { return error_code_; }

    // Convenience methods
    bool is_not_found() const { return error_code_ == ENOENT; }
    bool is_permission_denied() const { return error_code_ == EACCES; }
    bool is_disk_full() const { return error_code_ == ENOSPC; }

private:
    static std::string format_message(const std::string& op, int err) {
        return op + ": " + std::strerror(err) + " (errno=" + std::to_string(err) + ")";
    }

    int error_code_;
};

// async_file.cpp - destructor handling
AsyncFile::~AsyncFile() {
    if (fd_ >= 0 && !closed_) {
        // Cannot co_await in destructor, so do sync close
        // This is a known limitation - user should call close() explicitly
        // for proper async cleanup
        int result = ::close(fd_);
        if (result < 0) {
            // Log error but don't throw from destructor
            std::cerr << "Warning: close() failed in destructor: "
                      << std::strerror(errno) << std::endl;
        }
    }
}

Task<void> AsyncFile::close() {
    if (!is_open()) {
        co_return;  // Already closed
    }

    int fd = fd_;
    fd_ = -1;
    closed_ = true;

    co_await spawn_blocking(runtime_, [fd]() {
        if (::close(fd) < 0) {
            throw make_io_error("close");
        }
    });
}

Testing checkpoint:

Task<void> test_error_handling(Runtime& runtime) {
    // Test file not found
    try {
        auto file = co_await AsyncFile::open(runtime, "/nonexistent/path/file.txt",
            OpenMode::Read);
        assert(false && "Should have thrown");
    } catch (const IOError& e) {
        assert(e.is_not_found());
    }

    // Test permission denied (if not root)
    try {
        auto file = co_await AsyncFile::open(runtime, "/etc/shadow",
            OpenMode::Read);
        // If we got here, we're root - skip test
    } catch (const IOError& e) {
        assert(e.is_permission_denied());
    }
}

Phase 4: Full File Copy Implementation (Days 11-12)

Goals:

  • Complete copy_file implementation
  • Handle partial reads/writes
  • Measure performance

Key Code:

// Complete file copy with proper partial write handling
Task<void> copy_file(Runtime& runtime,
                     std::string src,
                     std::string dst,
                     size_t buffer_size = 64 * 1024) {
    auto in = co_await AsyncFile::open(runtime, src, OpenMode::Read);
    auto out = co_await AsyncFile::open(runtime, dst,
        OpenMode::Write | OpenMode::Create | OpenMode::Truncate);

    std::vector<char> buffer(buffer_size);
    size_t total_copied = 0;
    auto start_time = std::chrono::steady_clock::now();

    while (true) {
        size_t bytes_read = co_await in.read(buffer);
        if (bytes_read == 0) break;  // EOF

        // Handle partial writes
        std::span<const char> remaining(buffer.data(), bytes_read);
        while (!remaining.empty()) {
            size_t bytes_written = co_await out.write(remaining);
            remaining = remaining.subspan(bytes_written);
        }

        total_copied += bytes_read;
    }

    auto end_time = std::chrono::steady_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
        end_time - start_time);

    double throughput_mbps = (total_copied / 1024.0 / 1024.0) /
                             (duration.count() / 1000.0);

    std::cout << "Copied " << total_copied << " bytes in "
              << duration.count() << "ms ("
              << throughput_mbps << " MB/s)\n";

    co_await in.close();
    co_await out.close();
}

Phase 5: Performance Optimizations (Days 13-14)

Goals:

  • Implement readahead (start next read while processing current)
  • Implement write buffering
  • Benchmark and tune

Readahead Pattern:

// Advanced: Overlapped reads for better throughput
Task<void> copy_file_overlapped(Runtime& runtime,
                                std::string src,
                                std::string dst) {
    auto in = co_await AsyncFile::open(runtime, src, OpenMode::Read);
    auto out = co_await AsyncFile::open(runtime, dst,
        OpenMode::Write | OpenMode::Create | OpenMode::Truncate);

    constexpr size_t BUFFER_SIZE = 64 * 1024;
    std::vector<char> buffer1(BUFFER_SIZE);
    std::vector<char> buffer2(BUFFER_SIZE);

    // Start first read
    size_t bytes_in_buf1 = co_await in.read(buffer1);

    while (bytes_in_buf1 > 0) {
        // Start next read (will run concurrently with write)
        Task<size_t> next_read = in.read(buffer2);

        // Write current buffer
        std::span<const char> remaining(buffer1.data(), bytes_in_buf1);
        while (!remaining.empty()) {
            size_t written = co_await out.write(remaining);
            remaining = remaining.subspan(written);
        }

        // Wait for next read to complete
        bytes_in_buf1 = co_await std::move(next_read);

        // Swap buffers
        std::swap(buffer1, buffer2);
    }
}

Write Buffering Pattern:

// Buffered writer that batches small writes
class BufferedAsyncWriter {
public:
    BufferedAsyncWriter(AsyncFile& file, size_t buffer_size = 64 * 1024)
        : file_(file), buffer_(buffer_size), used_(0) {}

    Task<void> write(std::span<const char> data) {
        while (!data.empty()) {
            size_t space = buffer_.size() - used_;
            size_t to_copy = std::min(space, data.size());

            std::memcpy(buffer_.data() + used_, data.data(), to_copy);
            used_ += to_copy;
            data = data.subspan(to_copy);

            if (used_ == buffer_.size()) {
                co_await flush();
            }
        }
    }

    Task<void> flush() {
        if (used_ == 0) co_return;

        std::span<const char> remaining(buffer_.data(), used_);
        while (!remaining.empty()) {
            size_t written = co_await file_.write(remaining);
            remaining = remaining.subspan(written);
        }
        used_ = 0;
    }

private:
    AsyncFile& file_;
    std::vector<char> buffer_;
    size_t used_;
};

Testing Strategy

Test Categories

Category Purpose Examples
Unit Tests Test individual components spawn_blocking returns correct value
Integration Tests Test full async flow File copy works end-to-end
Error Tests Test error handling File not found throws IOError
Stress Tests Test under load 1000 concurrent file operations
Performance Tests Measure throughput Compare to sync implementation

Critical Test Cases

// Test 1: Basic spawn_blocking
TEST(SpawnBlocking, ReturnsValue) {
    Runtime runtime(2, 4);
    auto result = runtime.block_on(spawn_blocking(runtime, [] { return 42; }));
    EXPECT_EQ(result, 42);
}

// Test 2: spawn_blocking exception propagation
TEST(SpawnBlocking, PropagatesExceptions) {
    Runtime runtime(2, 4);
    EXPECT_THROW({
        runtime.block_on(spawn_blocking(runtime, [] {
            throw std::runtime_error("test error");
            return 0;
        }));
    }, std::runtime_error);
}

// Test 3: Basic file read
TEST(AsyncFile, ReadSmallFile) {
    Runtime runtime(2, 4);

    // Create test file
    std::ofstream("/tmp/test_read.txt") << "Hello, World!";

    auto task = [](Runtime& rt) -> Task<std::string> {
        auto file = co_await AsyncFile::open(rt, "/tmp/test_read.txt", OpenMode::Read);
        std::vector<char> buffer(100);
        size_t bytes = co_await file.read(buffer);
        co_return std::string(buffer.data(), bytes);
    };

    std::string content = runtime.block_on(task(runtime));
    EXPECT_EQ(content, "Hello, World!");
}

// Test 4: Large file copy
TEST(AsyncFile, CopyLargeFile) {
    Runtime runtime(2, 4);

    // Create 10MB test file
    {
        std::ofstream out("/tmp/test_large.bin", std::ios::binary);
        std::vector<char> data(1024 * 1024, 'X');
        for (int i = 0; i < 10; ++i) {
            out.write(data.data(), data.size());
        }
    }

    runtime.block_on(copy_file(runtime, "/tmp/test_large.bin", "/tmp/test_copy.bin"));

    // Verify sizes match
    std::ifstream in1("/tmp/test_large.bin", std::ios::binary | std::ios::ate);
    std::ifstream in2("/tmp/test_copy.bin", std::ios::binary | std::ios::ate);
    EXPECT_EQ(in1.tellg(), in2.tellg());
}

// Test 5: Concurrent operations
TEST(AsyncFile, ConcurrentReads) {
    Runtime runtime(2, 8);

    // Create test files
    for (int i = 0; i < 10; ++i) {
        std::ofstream("/tmp/test_" + std::to_string(i) + ".txt")
            << "File " << i;
    }

    auto task = [](Runtime& rt) -> Task<void> {
        std::vector<Task<std::string>> tasks;
        for (int i = 0; i < 10; ++i) {
            tasks.push_back([](Runtime& rt, int idx) -> Task<std::string> {
                auto file = co_await AsyncFile::open(rt,
                    "/tmp/test_" + std::to_string(idx) + ".txt", OpenMode::Read);
                std::vector<char> buffer(100);
                size_t bytes = co_await file.read(buffer);
                co_return std::string(buffer.data(), bytes);
            }(rt, i));
        }

        for (int i = 0; i < 10; ++i) {
            std::string content = co_await std::move(tasks[i]);
            assert(content == "File " + std::to_string(i));
        }
    };

    runtime.block_on(task(runtime));
}

// Test 6: Error handling
TEST(AsyncFile, FileNotFound) {
    Runtime runtime(2, 4);

    auto task = [](Runtime& rt) -> Task<void> {
        try {
            auto file = co_await AsyncFile::open(rt, "/nonexistent/file.txt", OpenMode::Read);
            FAIL() << "Should have thrown";
        } catch (const IOError& e) {
            EXPECT_TRUE(e.is_not_found());
        }
    };

    runtime.block_on(task(runtime));
}

// Test 7: EOF handling
TEST(AsyncFile, ReturnsZeroOnEOF) {
    Runtime runtime(2, 4);

    std::ofstream("/tmp/test_eof.txt") << "short";

    auto task = [](Runtime& rt) -> Task<void> {
        auto file = co_await AsyncFile::open(rt, "/tmp/test_eof.txt", OpenMode::Read);

        std::vector<char> buffer(1024);
        size_t bytes1 = co_await file.read(buffer);
        EXPECT_EQ(bytes1, 5);

        size_t bytes2 = co_await file.read(buffer);
        EXPECT_EQ(bytes2, 0);  // EOF
    };

    runtime.block_on(task(runtime));
}

Performance Benchmarks

void benchmark_file_copy(Runtime& runtime) {
    // Create 1GB test file
    const size_t FILE_SIZE = 1024 * 1024 * 1024;
    {
        std::ofstream out("/tmp/bench_src.bin", std::ios::binary);
        std::vector<char> chunk(1024 * 1024, 'X');
        for (size_t written = 0; written < FILE_SIZE; written += chunk.size()) {
            out.write(chunk.data(), chunk.size());
        }
    }

    // Benchmark sync copy
    auto sync_start = std::chrono::steady_clock::now();
    {
        std::ifstream in("/tmp/bench_src.bin", std::ios::binary);
        std::ofstream out("/tmp/bench_sync.bin", std::ios::binary);
        std::vector<char> buffer(64 * 1024);
        while (in.read(buffer.data(), buffer.size()) || in.gcount() > 0) {
            out.write(buffer.data(), in.gcount());
        }
    }
    auto sync_duration = std::chrono::steady_clock::now() - sync_start;

    // Benchmark async copy
    auto async_start = std::chrono::steady_clock::now();
    runtime.block_on(copy_file(runtime, "/tmp/bench_src.bin", "/tmp/bench_async.bin"));
    auto async_duration = std::chrono::steady_clock::now() - async_start;

    // Benchmark overlapped copy
    auto overlap_start = std::chrono::steady_clock::now();
    runtime.block_on(copy_file_overlapped(runtime, "/tmp/bench_src.bin", "/tmp/bench_overlap.bin"));
    auto overlap_duration = std::chrono::steady_clock::now() - overlap_start;

    std::cout << "Sync:     " << std::chrono::duration_cast<std::chrono::milliseconds>(sync_duration).count() << "ms\n";
    std::cout << "Async:    " << std::chrono::duration_cast<std::chrono::milliseconds>(async_duration).count() << "ms\n";
    std::cout << "Overlapped: " << std::chrono::duration_cast<std::chrono::milliseconds>(overlap_duration).count() << "ms\n";
}

Common Pitfalls & Debugging

Frequent Mistakes

Pitfall Symptom Solution
Capturing this by reference Use-after-free crash Capture fd_ by value, not this
Forgetting partial writes Data corruption Loop until all bytes written
Throwing from destructor Program termination Log errors, don’t throw
Using file after close EBADF errors Track closed state, check before operations
Thread pool too small Poor concurrency Size I/O pool based on expected concurrency
Capturing span by value Dangling reference Ensure buffer outlives the operation

Debugging Strategies

1. Trace spawn_blocking flow:

template<typename F>
auto spawn_blocking(Runtime& runtime, F&& func) -> Task<std::invoke_result_t<F>> {
    std::cerr << "[spawn_blocking] Queueing work on thread "
              << std::this_thread::get_id() << "\n";

    // ... implementation ...

    std::cerr << "[spawn_blocking] Resuming on thread "
              << std::this_thread::get_id() << "\n";
}

2. Track file descriptor lifecycle:

AsyncFile::AsyncFile(int fd, Runtime& runtime)
    : fd_(fd), runtime_(runtime) {
    std::cerr << "[AsyncFile] Opened fd=" << fd_ << "\n";
}

AsyncFile::~AsyncFile() {
    std::cerr << "[AsyncFile] Destructor fd=" << fd_
              << " closed=" << closed_ << "\n";
}

3. Verify thread usage:

Task<size_t> AsyncFile::read(std::span<char> buffer) {
    auto caller_thread = std::this_thread::get_id();

    ssize_t result = co_await spawn_blocking(runtime_, [=, &caller_thread]() {
        assert(std::this_thread::get_id() != caller_thread
               && "I/O should happen on different thread!");
        return ::read(fd_, buffer.data(), buffer.size());
    });

    // Should be back on async thread (or original)
    co_return static_cast<size_t>(result);
}

4. Use AddressSanitizer for memory issues:

cmake -DCMAKE_CXX_FLAGS="-fsanitize=address -g" ..

5. Use ThreadSanitizer for race conditions:

cmake -DCMAKE_CXX_FLAGS="-fsanitize=thread -g" ..

Performance Issues

Issue Symptom Fix
Too many small I/Os High CPU, low throughput Use larger buffers, buffered writer
I/O pool contention High latency variance Increase pool size
Thread creation overhead Slow startup Pre-create thread pool
Memory allocation in hot path Poor cache performance Reuse buffers
Frequent coroutine suspensions Overhead Batch operations

Extensions & Challenges

Extension 1: io_uring Integration (Linux 5.1+)

Replace the thread pool with true kernel async I/O:

class IOURingRuntime {
public:
    IOURingRuntime(unsigned entries = 256);

    // Submit read operation
    Task<size_t> async_read(int fd, std::span<char> buffer, off_t offset);

    // Submit write operation
    Task<size_t> async_write(int fd, std::span<const char> buffer, off_t offset);

    // Process completions
    void poll();

private:
    struct io_uring ring_;
    std::unordered_map<uint64_t, std::coroutine_handle<>> pending_;
};

Key concepts:

  • Submission Queue (SQ): User submits I/O requests
  • Completion Queue (CQ): Kernel posts completions
  • No thread pool needed - kernel handles async
  • Can do zero-copy with registered buffers

Extension 2: Memory-Mapped File Support

class AsyncMappedFile {
public:
    static Task<AsyncMappedFile> open(Runtime& runtime,
                                      std::string path,
                                      size_t size = 0);

    // Get a span to mapped memory (sync access, no copy)
    std::span<char> data();

    // Async sync to disk
    Task<void> sync(bool async = true);

private:
    void* mapping_;
    size_t size_;
    int fd_;
};

Extension 3: Directory Operations

Task<std::vector<std::string>> readdir(Runtime& runtime, std::string path);
Task<void> mkdir(Runtime& runtime, std::string path, mode_t mode = 0755);
Task<void> remove(Runtime& runtime, std::string path);
Task<void> rename(Runtime& runtime, std::string from, std::string to);

Extension 4: Watching for File Changes

class FileWatcher {
public:
    Task<void> watch(Runtime& runtime, std::string path);

    // Returns when file changes
    Task<FileEvent> next_event();

private:
    int inotify_fd_;
    Runtime& runtime_;
};

Extension 5: Async Network I/O

Extend the library to support sockets using epoll/kqueue:

class AsyncSocket {
public:
    static Task<AsyncSocket> connect(Runtime& runtime,
                                     std::string host,
                                     uint16_t port);

    Task<size_t> read(std::span<char> buffer);
    Task<size_t> write(std::span<const char> buffer);
    Task<void> close();
};

This would use the event loop pattern rather than thread pool offloading.


Resources

Essential Reading

  • “The Linux Programming Interface” by Michael Kerrisk
    • Chapter 63: Alternative I/O Models
    • Chapter 5: File I/O (Deep Dive)
    • Chapter 4: File I/O (Universal Model)
  • “C++ Concurrency in Action” by Anthony Williams
    • Chapter 4: Synchronizing concurrent operations
    • Chapter 9: Advanced thread management

Documentation

Video Resources

  • “C++20 Coroutines” - CppCon talks by various speakers
  • “io_uring: The Lord of the Rings” - Jonathan Corbet
  • “Asio: Asynchronous I/O” - Christopher Kohlhoff
  • cppcoro: Lewis Baker’s coroutine library
  • liburing: io_uring wrapper library
  • libuv: Node.js async I/O library
  • ASIO: Boost/standalone async I/O

Books That Will Help

Topic Book Chapter
POSIX File I/O The Linux Programming Interface Ch. 4-5, 63
Async patterns C++ Concurrency in Action Ch. 4, 9
Thread pools The Art of Multiprocessor Programming Ch. 16
Coroutines (Online resources - see cppreference) N/A
io_uring (Kernel documentation + blog posts) N/A
Systems programming Computer Systems: A Programmer’s Perspective Ch. 10

Self-Assessment Checklist

Before considering this project complete, verify:

Understanding

  • I can explain why traditional file I/O is blocking and how spawn_blocking makes it “async”
  • I understand the difference between true async I/O (io_uring) and thread-pool-based async
  • I can describe how the coroutine suspension/resumption works for spawn_blocking
  • I understand why we need separate thread pools for async coroutines vs blocking I/O
  • I can explain the performance tradeoffs of buffer sizes
  • I know when async file I/O provides benefits over sync I/O

Implementation

  • spawn_blocking correctly offloads work and returns results
  • AsyncFile::open handles all mode combinations
  • read() returns correct byte counts including 0 for EOF
  • write() handles partial writes correctly
  • Errors propagate as exceptions through coroutine suspension
  • File descriptors are properly closed (RAII + explicit close)
  • No memory leaks or use-after-free issues

Testing

  • Unit tests pass for all components
  • Integration tests cover full file copy workflow
  • Error handling tests verify exception behavior
  • Performance benchmarks show expected characteristics
  • Stress tests with concurrent operations pass
  • Memory sanitizer and thread sanitizer report no issues

Growth

  • I’ve benchmarked my implementation against sync I/O
  • I understand how production runtimes (Tokio, ASIO) structure their I/O
  • I can identify bottlenecks in async I/O code
  • I’m ready to explore io_uring for true async I/O

The Interview Questions They’ll Ask

After completing this project, you’ll be ready for these questions:

  1. “How do async file operations work under the hood?”
    • They want: Traditional file I/O is blocking; async is achieved by offloading to a thread pool that performs blocking calls, then resuming the waiting coroutine when complete. io_uring provides true kernel async.
  2. “What’s the difference between io_uring and thread-pool-based async I/O?”
    • They want: Thread pool has context switch overhead, requires threads for each concurrent operation. io_uring uses kernel submission/completion queues, zero-copy possible, fewer syscalls.
  3. “How do you handle errors in async I/O?”
    • They want: Map errno to exceptions, propagate through coroutine suspension points, ensure resource cleanup happens regardless of error path.
  4. “Why can’t you use O_NONBLOCK for file I/O?”
    • They want: O_NONBLOCK only affects pipes and sockets. Regular file reads always block because the data must come from disk.
  5. “How would you implement a high-performance file copy?”
    • They want: Overlapping reads and writes using double buffering, appropriate buffer sizes (typically 64KB-1MB), possibly using sendfile() or splice() for kernel-level optimization.
  6. “What’s the relationship between spawn_blocking and the async executor?”
    • They want: spawn_blocking queues work to an I/O thread pool (different from async executor threads), returns a Task that suspends the calling coroutine until the blocking work completes.

Submission / Completion Criteria

Minimum Viable Completion:

  • spawn_blocking works correctly
  • AsyncFile can open, read, write, and close
  • Basic file copy example runs successfully
  • Error handling works for common cases

Full Completion:

  • All AsyncFile operations implemented
  • Robust error handling with IOError class
  • Performance benchmarks show reasonable throughput
  • Unit tests for all components pass
  • Documentation explains design decisions

Excellence (Going Above & Beyond):

  • Overlapped read/write for improved throughput
  • Buffered writer for small writes
  • io_uring integration (Linux 5.1+)
  • Memory-mapped file support
  • Directory operation support
  • Comprehensive benchmark suite

This guide was expanded from LEARN_CPP_CONCURRENCY_AND_PARALLELISM.md. For the complete learning path, see the project index.