Project 16: Distributed Task Scheduler

Build a distributed task execution system where tasks can be submitted to any node, executed on any node with available capacity, with task dependencies, result aggregation, and failure recovery. This is the capstone project that combines all concurrency concepts: thread pools for local execution, async I/O for network communication, lock-free structures for high performance, and coordination primitives for distributed consensus.


Quick Reference

Attribute Value
Project Number 16 of 17
Category C++ Concurrency Mastery
Difficulty Level 5: Master
Time Estimate 6-10 weeks
Main Programming Language C++
Alternative Languages Go, Rust, Java
Coolness Level Level 5: Pure Magic (Super Cool)
Business Potential 5. The “Industry Disruptor”
Knowledge Area Distributed Systems / Concurrency
Primary Tools gRPC, Protobuf, C++20, ThreadSanitizer
Main Book “Designing Data-Intensive Applications” by Martin Kleppmann

Summary: Build a production-quality distributed task scheduler with coordinator-worker architecture, task dependency graphs (DAG execution), work stealing across network boundaries, failure recovery with retries and timeouts, and result aggregation. This project integrates every concurrency concept from the previous 15 projects into a cohesive distributed system.


Learning Objectives

By completing this project, you will be able to:

  1. Design distributed system architecture with coordinator-worker topology, clear separation of concerns, and well-defined communication protocols
  2. Implement network communication using gRPC or raw sockets with proper serialization, connection management, and error handling
  3. Build task dependency graphs (DAGs) with topological sorting, dependency resolution, and parallel execution of independent tasks
  4. Handle distributed failures including network partitions, worker crashes, task timeouts, and partial failures with retry logic
  5. Coordinate distributed state using leader election, heartbeats, and consensus-lite protocols
  6. Aggregate results from distributed workers using map-reduce patterns and streaming aggregation
  7. Apply all previous concurrency concepts: thread pools (P03), lock-free queues (P06), async I/O (P11), and actors (P15) in a real distributed context
  8. Answer interview questions about distributed systems, consensus, fault tolerance, and large-scale system design

Theoretical Foundation

Core Concepts

The Distributed Task Scheduling Problem

You have N worker machines. You want to execute M tasks (where M » N) as efficiently as possible. Tasks may have dependencies (task B requires output of task A). Workers may fail. Network may partition. How do you coordinate this?

                    THE PROBLEM SPACE

    +------------------+        +------------------+
    |   Task Graph     |        |   Worker Pool    |
    |                  |        |                  |
    |   A ──► B        |        |   Worker-1 (4c)  |
    |   │     │        |   ?    |   Worker-2 (8c)  |
    |   ▼     ▼        | ────►  |   Worker-3 (4c)  |
    |   C ──► D        |        |   Worker-4 (8c)  |
    |         │        |        |                  |
    |         ▼        |        |   (Workers may   |
    |         E        |        |    fail at any   |
    |                  |        |    time!)        |
    +------------------+        +------------------+

Questions you must answer:
1. Which tasks can run in parallel? (A and... nothing initially. Then C when A completes)
2. Which worker runs each task? (Load balancing)
3. What happens if Worker-2 crashes while running B? (Failure recovery)
4. How do results flow back and aggregate? (Data movement)
5. How does the coordinator know worker health? (Heartbeats)

Coordinator-Worker Architecture

The classic architecture for distributed task execution:

                        COORDINATOR-WORKER ARCHITECTURE

                           ┌─────────────────────┐
                           │                     │
                           │    COORDINATOR      │
                           │                     │
                           │  ┌──────────────┐   │
    Task Submission ───────►  │ Task Queue   │   │
                           │  │ [T1,T2,T3..] │   │
                           │  └──────────────┘   │
                           │         │           │
                           │  ┌──────▼───────┐   │
                           │  │ Scheduler    │   │
                           │  │              │   │
                           │  │ • DAG state  │   │
                           │  │ • Worker map │   │
                           │  │ • Assignments│   │
                           │  └──────────────┘   │
                           │         │           │
                           └─────────┼───────────┘
                                     │
                 ┌───────────────────┼───────────────────┐
                 │                   │                   │
        ┌────────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐
        │   WORKER-1      │ │   WORKER-2     │ │   WORKER-3     │
        │                 │ │                │ │                │
        │ ┌─────────────┐ │ │ ┌────────────┐ │ │ ┌────────────┐ │
        │ │ ThreadPool  │ │ │ │ ThreadPool │ │ │ │ ThreadPool │ │
        │ │ (4 threads) │ │ │ │ (8 threads)│ │ │ │ (4 threads)│ │
        │ └─────────────┘ │ │ └────────────┘ │ │ └────────────┘ │
        │                 │ │                │ │                │
        │ ┌─────────────┐ │ │ ┌────────────┐ │ │ ┌────────────┐ │
        │ │ Task Queue  │ │ │ │ Task Queue │ │ │ │ Task Queue │ │
        │ └─────────────┘ │ │ └────────────┘ │ │ └────────────┘ │
        │                 │ │                │ │                │
        │ Heartbeat ──────┼─┼────────────────┼─┼────────────────┤
        │                 │ │                │ │                │
        └─────────────────┘ └────────────────┘ └────────────────┘

Communication:
  ─────► Task assignment (coordinator → worker)
  ◄───── Result/status (worker → coordinator)
  ───────── Heartbeat (worker → coordinator, periodic)

Coordinator responsibilities:

  • Accept task submissions from clients
  • Track task state (pending, assigned, running, completed, failed)
  • Manage task dependency graph (DAG)
  • Assign tasks to workers based on capacity and locality
  • Handle task failures (reassignment, retries)
  • Aggregate results

Worker responsibilities:

  • Register with coordinator on startup
  • Report capacity (number of execution slots)
  • Pull/receive tasks from coordinator
  • Execute tasks using local thread pool
  • Report results and failures
  • Send periodic heartbeats

Task State Machine

Every task follows a well-defined state machine:

                        TASK STATE MACHINE

    ┌─────────────────────────────────────────────────────────────────┐
    │                                                                 │
    │       PENDING ────────────► ASSIGNED ────────────► RUNNING     │
    │          │                      │                      │        │
    │          │                      │                      │        │
    │          │    (worker crash)    │                      │        │
    │          ◄──────────────────────┘                      │        │
    │          │                                             │        │
    │          │    (timeout)                                │        │
    │          ◄─────────────────────────────────────────────┘        │
    │          │                                             │        │
    │          │                                             ▼        │
    │          │                                        COMPLETED     │
    │          │                                             │        │
    │          │                                             │        │
    │          │                      ┌──────────────────────┘        │
    │          │                      │                               │
    │          │                      ▼                               │
    │          │                  AGGREGATED                          │
    │          │                                                      │
    │          │                                                      │
    │          │                  ┌──────────┐                        │
    │          │                  │  FAILED  │                        │
    │          └─── (max retries) ─┤          │                        │
    │                             │ terminal │                        │
    │                             └──────────┘                        │
    │                                                                 │
    └─────────────────────────────────────────────────────────────────┘

State Transitions:
  PENDING   → ASSIGNED  : Scheduler assigns task to worker
  ASSIGNED  → RUNNING   : Worker acknowledges and starts execution
  RUNNING   → COMPLETED : Worker reports success with result
  RUNNING   → PENDING   : Timeout (reschedule to different worker)
  ASSIGNED  → PENDING   : Worker crash detected (heartbeat timeout)
  *         → FAILED    : Max retries exceeded
  COMPLETED → AGGREGATED: Result incorporated into job output

Dependency Graph (DAG) Execution

Tasks form a Directed Acyclic Graph (DAG) based on their dependencies:

                    DAG EXECUTION MODEL

    Job Definition:
    ┌─────────────────────────────────────────────────────────────┐
    │ Task A: Download file                                       │
    │ Task B: Parse JSON (depends on A)                           │
    │ Task C: Validate schema (depends on A)                      │
    │ Task D: Transform data (depends on B, C)                    │
    │ Task E: Upload result (depends on D)                        │
    └─────────────────────────────────────────────────────────────┘

    Visual Representation:
                    ┌───────┐
                    │   A   │  Level 0 (no dependencies)
                    └───┬───┘
                        │
            ┌───────────┴───────────┐
            │                       │
        ┌───▼───┐               ┌───▼───┐
        │   B   │               │   C   │  Level 1 (depend on A)
        └───┬───┘               └───┬───┘
            │                       │
            └───────────┬───────────┘
                        │
                    ┌───▼───┐
                    │   D   │  Level 2 (depends on B and C)
                    └───┬───┘
                        │
                    ┌───▼───┐
                    │   E   │  Level 3 (depends on D)
                    └───────┘

    Execution Timeline:
    ──────────────────────────────────────────────────────────────►
    Time   0        1        2        3        4        5

    A      [======]
    B               [=====]
    C               [===]
    D                        [========]
    E                                   [====]

    Note: B and C run in parallel because they only depend on A
          (they don't depend on each other)

Key DAG operations:

  1. Topological sort: Determine execution order respecting dependencies
  2. Ready set computation: Find tasks whose dependencies are all complete
  3. Fan-out tracking: When task completes, update all dependents
  4. Critical path analysis: Identify the longest dependency chain

Why This Matters

Distributed task scheduling is the foundation of:

  • MapReduce / Spark: Hadoop YARN, Spark scheduler
  • CI/CD systems: GitHub Actions, Jenkins, CircleCI
  • Workflow orchestration: Airflow, Prefect, Temporal
  • Container orchestration: Kubernetes scheduler
  • Distributed builds: Bazel remote execution, distcc
  • Machine learning: Distributed training, hyperparameter search

Understanding this architecture lets you:

  1. Design scalable batch processing systems
  2. Build fault-tolerant job execution platforms
  3. Reason about distributed system failures
  4. Interview for infrastructure/platform roles

Historical Context

1990s: First distributed job schedulers (Condor, PBS) for HPC clusters 2004: MapReduce paper from Google defines modern distributed batch processing 2006: Hadoop open-sources MapReduce concepts 2010s: Container orchestration (Kubernetes) and workflow systems (Airflow) 2020s: Serverless task execution, edge computing coordination

Common Misconceptions

Misconception 1: “Just use a message queue” Reality: A message queue handles task delivery but not dependency graphs, result aggregation, failure recovery, or scheduling decisions. You need orchestration logic on top.

Misconception 2: “Distributed = just more threads” Reality: Network failures, partial failures, and ordering problems make distributed systems fundamentally different from concurrent programs.

Misconception 3: “Consensus is required for everything” Reality: Full consensus (Raft/Paxos) is expensive. Many systems use weaker consistency with application-level conflict resolution.

Misconception 4: “Retries solve all failures” Reality: Idempotency, exactly-once semantics, and side-effect management are hard problems that retries alone don’t solve.


Project Specification

What You Will Build

A distributed task scheduler with the following components:

  1. Coordinator Node (dtask coordinator)
    • Accepts task/job submissions via gRPC
    • Manages task state machine and dependency graph
    • Schedules tasks to workers based on capacity
    • Tracks worker health via heartbeats
    • Handles failure recovery and retries
    • Aggregates results
  2. Worker Node (dtask worker)
    • Registers with coordinator on startup
    • Runs local thread pool (from Project 3)
    • Executes assigned tasks
    • Reports completion, failure, and progress
    • Sends heartbeats to coordinator
  3. Client CLI (dtask submit, dtask status)
    • Submits jobs with task definitions and dependencies
    • Queries job/task status
    • Retrieves results

Functional Requirements

ID Requirement Priority
FR1 Submit jobs with multiple tasks and dependencies Must
FR2 Execute tasks on workers with available capacity Must
FR3 Track task state transitions correctly Must
FR4 Handle worker crashes with task reassignment Must
FR5 Respect task dependencies (DAG execution) Must
FR6 Aggregate results from completed tasks Must
FR7 Support task timeouts with configurable limits Must
FR8 Retry failed tasks with configurable retry count Must
FR9 Work stealing across workers (optional) Should
FR10 Support task priorities Should
FR11 Support task cancellation Should
FR12 Graceful coordinator failover (bonus) Could

Non-Functional Requirements

Metric Target Description
Task Throughput > 10,000/sec Tasks scheduled per second
Task Latency (P50) < 10ms Time from ready to running
Task Latency (P99) < 100ms 99th percentile scheduling latency
Worker Recovery < 5s Time to detect failed worker and reschedule
Coordinator Recovery < 30s Time to recover coordinator state (bonus)
Scalability 100+ workers System should scale linearly
Memory Overhead < 1KB/task Per-task memory in coordinator

Example Usage/Output

Terminal 1: Start Coordinator

$ ./dtask coordinator --port 8080 --heartbeat-interval 1s --task-timeout 60s
[Coordinator] Starting on port 8080
[Coordinator] Configuration:
  Heartbeat interval: 1s
  Task timeout: 60s
  Max retries: 3
[Coordinator] Waiting for workers...

Terminal 2-4: Start Workers

$ ./dtask worker --coordinator localhost:8080 --threads 4 --name worker-1
[worker-1] Connecting to coordinator at localhost:8080...
[worker-1] Connected! Advertising 4 execution slots
[worker-1] Waiting for tasks...
$ ./dtask worker --coordinator localhost:8080 --threads 8 --name worker-2
[worker-2] Connected! Advertising 8 execution slots
$ ./dtask worker --coordinator localhost:8080 --threads 4 --name worker-3
[worker-3] Connected! Advertising 4 execution slots

Terminal 5: Submit Job

$ cat wordcount.yaml
job:
  name: wordcount-example
  tasks:
    - id: map-001
      type: map
      input: s3://bucket/input/chunk-001.txt
      output: /tmp/map-001.json

    - id: map-002
      type: map
      input: s3://bucket/input/chunk-002.txt
      output: /tmp/map-002.json

    - id: map-003
      type: map
      input: s3://bucket/input/chunk-003.txt
      output: /tmp/map-003.json

    - id: reduce-001
      type: reduce
      dependencies: [map-001, map-002, map-003]
      inputs: [/tmp/map-001.json, /tmp/map-002.json, /tmp/map-003.json]
      output: /tmp/final-result.json

$ ./dtask submit --coordinator localhost:8080 --job wordcount.yaml
Submitting job: wordcount-example
  Tasks: 4 (3 map + 1 reduce)
  Dependencies: reduce-001 depends on [map-001, map-002, map-003]

[Coordinator] Job accepted, ID: job-20240115-001
[Coordinator] Scheduling 3 map tasks (no dependencies)...

Coordinator Output:

[Coordinator] Job job-20240115-001 accepted
[Coordinator] Task DAG analysis:
  Level 0: map-001, map-002, map-003 (can run immediately)
  Level 1: reduce-001 (depends on 3 tasks)

[Coordinator] Assigning map-001 to worker-1 (3/4 slots available)
[Coordinator] Assigning map-002 to worker-2 (7/8 slots available)
[Coordinator] Assigning map-003 to worker-3 (3/4 slots available)

[worker-1] Executing map-001...
[worker-2] Executing map-002...
[worker-3] Executing map-003...

[Coordinator] map-002 completed on worker-2 (1.2s)
[Coordinator] map-003 completed on worker-3 (1.4s)
[Coordinator] map-001 completed on worker-1 (1.8s)

[Coordinator] All dependencies for reduce-001 satisfied
[Coordinator] Assigning reduce-001 to worker-2 (8/8 slots, most capacity)

[worker-2] Executing reduce-001...
[Coordinator] reduce-001 completed on worker-2 (0.8s)

[Coordinator] Job job-20240115-001 complete!

Job Summary:
  Total time: 2.6s
  Tasks executed: 4
  Task failures: 0
  Worker utilization:
    worker-1: 1 task (map-001)
    worker-2: 2 tasks (map-002, reduce-001)
    worker-3: 1 task (map-003)

Result: /tmp/final-result.json

Failure Scenario:

[worker-1] Executing map-001...
[worker-1] CRASHED!

[Coordinator] Heartbeat timeout from worker-1 (>3s)
[Coordinator] Marking worker-1 as DEAD
[Coordinator] Reassigning map-001 to worker-3

[worker-3] Executing map-001...
[Coordinator] map-001 completed on worker-3 (1.8s, retry #1)

[Coordinator] Job complete despite worker failure!
  Task failures: 1 (recovered)

Real World Outcome

After implementing this project, you will have built a system similar to:

  • Apache Mesos: Resource manager and task scheduler
  • Kubernetes Job controller: Batch workload scheduling
  • Celery: Distributed task queue for Python
  • Dask distributed: Parallel computing scheduler
  • Temporal/Cadence: Workflow orchestration engine

Your implementation will demonstrate:

  1. Distributed systems design skills
  2. Fault tolerance and recovery patterns
  3. State machine design
  4. Network programming with gRPC
  5. Integration of all concurrency concepts from previous projects

Solution Architecture

High-Level Design

┌─────────────────────────────────────────────────────────────────────────────┐
│                    DISTRIBUTED TASK SCHEDULER                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ┌────────────────────────────────────────────────────────────────────┐    │
│   │                          COORDINATOR                                │    │
│   │                                                                     │    │
│   │   ┌──────────────┐  ┌──────────────┐  ┌──────────────┐            │    │
│   │   │ gRPC Server  │  │ Job Manager  │  │ Worker       │            │    │
│   │   │              │  │              │  │ Registry     │            │    │
│   │   │ • Submit()   │  │ • Jobs map   │  │              │            │    │
│   │   │ • Status()   │  │ • Task DAG   │  │ • Workers    │            │    │
│   │   │ • Register() │  │ • State      │  │ • Capacity   │            │    │
│   │   │ • Heartbeat()│  │   machine    │  │ • Health     │            │    │
│   │   └──────┬───────┘  └──────┬───────┘  └──────┬───────┘            │    │
│   │          │                 │                 │                     │    │
│   │          └─────────────────┼─────────────────┘                     │    │
│   │                            │                                       │    │
│   │                   ┌────────▼────────┐                             │    │
│   │                   │    Scheduler    │                             │    │
│   │                   │                 │                             │    │
│   │                   │ • Ready queue   │                             │    │
│   │                   │ • Assignments   │                             │    │
│   │                   │ • Load balance  │                             │    │
│   │                   └────────┬────────┘                             │    │
│   │                            │                                       │    │
│   └────────────────────────────┼───────────────────────────────────────┘    │
│                                │                                             │
│                      gRPC (Task Assignment, Results)                         │
│                                │                                             │
│         ┌──────────────────────┼──────────────────────┐                     │
│         │                      │                      │                      │
│   ┌─────▼─────┐          ┌─────▼─────┐          ┌─────▼─────┐              │
│   │  WORKER-1 │          │  WORKER-2 │          │  WORKER-3 │              │
│   │           │          │           │          │           │              │
│   │ ┌───────┐ │          │ ┌───────┐ │          │ ┌───────┐ │              │
│   │ │Thread │ │          │ │Thread │ │          │ │Thread │ │              │
│   │ │Pool   │ │          │ │Pool   │ │          │ │Pool   │ │              │
│   │ │(4 thr)│ │          │ │(8 thr)│ │          │ │(4 thr)│ │              │
│   │ └───────┘ │          │ └───────┘ │          │ └───────┘ │              │
│   │           │          │           │          │           │              │
│   │ ┌───────┐ │          │ ┌───────┐ │          │ ┌───────┐ │              │
│   │ │Task   │ │          │ │Task   │ │          │ │Task   │ │              │
│   │ │Queue  │ │          │ │Queue  │ │          │ │Queue  │ │              │
│   │ └───────┘ │          │ └───────┘ │          │ └───────┘ │              │
│   │           │          │           │          │           │              │
│   │ Heartbeat │          │ Heartbeat │          │ Heartbeat │              │
│   │ (1s)     ───────────────────────────────────► Coordinator              │
│   │           │          │           │          │           │              │
│   └───────────┘          └───────────┘          └───────────┘              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Key Components

1. Coordinator Components

// Job and Task representation
struct Task {
    std::string id;
    std::string job_id;
    TaskType type;
    std::vector<std::string> dependencies;
    TaskState state;
    std::optional<std::string> assigned_worker;
    std::chrono::steady_clock::time_point assigned_at;
    int retry_count;
    std::any result;
    std::any input;
};

struct Job {
    std::string id;
    std::string name;
    std::unordered_map<std::string, Task> tasks;
    std::unordered_map<std::string, std::vector<std::string>> dependency_graph;
    std::unordered_map<std::string, std::vector<std::string>> reverse_deps;
    JobState state;
    std::chrono::steady_clock::time_point submitted_at;
    std::chrono::steady_clock::time_point completed_at;
};

// Worker tracking
struct WorkerInfo {
    std::string id;
    std::string address;
    int total_slots;
    int available_slots;
    std::chrono::steady_clock::time_point last_heartbeat;
    WorkerState state;
    std::vector<std::string> assigned_tasks;
};

// Main coordinator class
class Coordinator {
public:
    // gRPC service methods
    grpc::Status SubmitJob(const JobRequest& request, JobResponse* response);
    grpc::Status GetJobStatus(const StatusRequest& request, StatusResponse* response);
    grpc::Status RegisterWorker(const WorkerInfo& info, RegistrationResponse* response);
    grpc::Status Heartbeat(const HeartbeatRequest& request, HeartbeatResponse* response);
    grpc::Status ReportTaskComplete(const TaskResult& result, Ack* response);

private:
    // Core state
    std::unordered_map<std::string, Job> jobs_;
    std::unordered_map<std::string, WorkerInfo> workers_;
    std::priority_queue<TaskRef> ready_queue_;

    // Synchronization
    mutable std::shared_mutex jobs_mutex_;
    mutable std::shared_mutex workers_mutex_;

    // Background threads
    std::thread scheduler_thread_;
    std::thread heartbeat_checker_thread_;

    // Internal methods
    void schedule_loop();
    void check_heartbeats();
    std::optional<std::string> select_worker(const Task& task);
    void handle_worker_failure(const std::string& worker_id);
    void update_task_state(const std::string& job_id, const std::string& task_id, TaskState new_state);
    std::vector<std::string> get_ready_tasks(const Job& job);
};

2. Worker Components

// Worker implementation using previous projects
class Worker {
public:
    Worker(const std::string& coordinator_addr, int num_threads, const std::string& name);

    void run();  // Main worker loop
    void shutdown();

private:
    // From Project 3: Work-stealing thread pool
    std::unique_ptr<WorkStealingPool> thread_pool_;

    // From Project 6: Lock-free task queue
    LockFreeQueue<TaskAssignment> incoming_tasks_;

    // gRPC client to coordinator
    std::unique_ptr<CoordinatorClient> coordinator_;

    // Worker state
    std::string id_;
    std::string name_;
    std::atomic<int> available_slots_;
    std::atomic<bool> shutdown_{false};

    // Background threads
    std::thread heartbeat_thread_;
    std::thread task_receiver_thread_;

    // Task execution
    void execute_task(const TaskAssignment& task);
    void report_result(const std::string& task_id, const TaskResult& result);
    void send_heartbeat();
};

3. Communication Protocol (gRPC)

// scheduler.proto

syntax = "proto3";

package dtask;

service Coordinator {
    // Job management
    rpc SubmitJob(JobSubmission) returns (JobResponse);
    rpc GetJobStatus(JobStatusRequest) returns (JobStatusResponse);
    rpc CancelJob(CancelRequest) returns (CancelResponse);

    // Worker management
    rpc RegisterWorker(WorkerRegistration) returns (RegistrationResponse);
    rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);

    // Task execution
    rpc GetTask(TaskRequest) returns (TaskAssignment);
    rpc ReportTaskComplete(TaskResult) returns (Ack);
    rpc ReportTaskFailed(TaskFailure) returns (Ack);
}

message JobSubmission {
    string name = 1;
    repeated TaskDefinition tasks = 2;
}

message TaskDefinition {
    string id = 1;
    string type = 2;
    repeated string dependencies = 3;
    bytes input = 4;  // Serialized task input
}

message TaskAssignment {
    string job_id = 1;
    string task_id = 2;
    string type = 3;
    bytes input = 4;
    int64 timeout_ms = 5;
}

message TaskResult {
    string job_id = 1;
    string task_id = 2;
    bool success = 3;
    bytes output = 4;  // Serialized result
    string error_message = 5;
    int64 execution_time_ms = 6;
}

message HeartbeatRequest {
    string worker_id = 1;
    int32 available_slots = 2;
    repeated string running_tasks = 3;
}

message HeartbeatResponse {
    bool acknowledged = 1;
    repeated string tasks_to_cancel = 2;  // For cancellation support
}

Data Structures

Task Dependency Graph (DAG)

class TaskDAG {
public:
    // Add task with dependencies
    void add_task(const std::string& task_id, const std::vector<std::string>& deps);

    // Get tasks with no unmet dependencies
    std::vector<std::string> get_ready_tasks() const;

    // Mark task as complete and update dependents
    std::vector<std::string> complete_task(const std::string& task_id);

    // Topological sort for execution order
    std::vector<std::string> topological_sort() const;

    // Check for cycles (validation)
    bool has_cycles() const;

private:
    // Adjacency list: task -> tasks that depend on it
    std::unordered_map<std::string, std::vector<std::string>> forward_edges_;

    // Reverse: task -> tasks it depends on
    std::unordered_map<std::string, std::vector<std::string>> backward_edges_;

    // Count of unmet dependencies per task
    std::unordered_map<std::string, int> unmet_dep_count_;

    // Set of completed tasks
    std::unordered_set<std::string> completed_;
};

Worker Load Balancer

class WorkerLoadBalancer {
public:
    // Register worker with capacity
    void add_worker(const std::string& id, int capacity);

    // Remove worker (on failure)
    void remove_worker(const std::string& id);

    // Update available capacity
    void update_capacity(const std::string& id, int available);

    // Select best worker for task (returns nullopt if none available)
    std::optional<std::string> select_worker(const Task& task);

    // Strategies
    enum class Strategy {
        LeastLoaded,      // Pick worker with most available slots
        RoundRobin,       // Rotate through workers
        Random,           // Random selection from available
        LocalityAware     // Prefer workers with cached data
    };

    void set_strategy(Strategy s);

private:
    std::unordered_map<std::string, int> worker_capacity_;
    std::unordered_map<std::string, int> worker_available_;
    Strategy strategy_ = Strategy::LeastLoaded;
    std::atomic<size_t> round_robin_index_{0};
};

Algorithm Overview

Scheduling Algorithm

SCHEDULE_TASKS():
    while not shutdown:
        # 1. Check for ready tasks across all jobs
        ready_tasks = []
        for job in active_jobs:
            for task in job.get_ready_tasks():
                if task.state == PENDING:
                    ready_tasks.append(task)

        # 2. Sort by priority (if implemented)
        ready_tasks.sort(by=priority, order=descending)

        # 3. Assign to available workers
        for task in ready_tasks:
            worker = load_balancer.select_worker(task)
            if worker:
                assign_task(task, worker)
                task.state = ASSIGNED
                task.assigned_at = now()
            else:
                # No workers available, try again later
                break

        # 4. Check for timeouts
        for task in running_tasks:
            if now() - task.assigned_at > task.timeout:
                handle_timeout(task)

        sleep(10ms)  # Scheduling interval

Failure Recovery Algorithm

HANDLE_WORKER_FAILURE(worker_id):
    # 1. Mark worker as dead
    worker.state = DEAD

    # 2. Collect all tasks assigned to this worker
    affected_tasks = []
    for task in worker.assigned_tasks:
        affected_tasks.append(task)

    # 3. Reset task state for rescheduling
    for task in affected_tasks:
        if task.retry_count < MAX_RETRIES:
            task.state = PENDING
            task.assigned_worker = None
            task.retry_count += 1
            log("Rescheduling task {} (retry {})", task.id, task.retry_count)
        else:
            task.state = FAILED
            log("Task {} exceeded max retries, marking as FAILED", task.id)
            mark_job_failed(task.job_id)

    # 4. Remove worker from registry
    workers.remove(worker_id)

    # 5. Trigger immediate scheduling
    notify_scheduler()

Result Aggregation Algorithm

AGGREGATE_RESULTS(job_id):
    job = jobs[job_id]

    # Check if all tasks complete
    for task in job.tasks:
        if task.state != COMPLETED:
            return  # Not ready yet

    # Collect results in dependency order
    ordered_tasks = job.dag.topological_sort()
    results = {}

    for task_id in ordered_tasks:
        task = job.tasks[task_id]
        results[task_id] = task.result

    # Find final tasks (no dependents)
    final_tasks = job.dag.get_terminal_tasks()

    # Merge final results
    job.final_result = merge_results([results[t] for t in final_tasks])
    job.state = COMPLETED
    job.completed_at = now()

    notify_client(job_id, job.final_result)

Implementation Guide

Development Environment Setup

Required tools:

# C++20 compiler
clang++ --version  # 14+ recommended
g++ --version      # 11+ recommended

# gRPC and Protobuf
brew install grpc protobuf  # macOS
apt install libgrpc++-dev protobuf-compiler  # Ubuntu

# CMake
cmake --version  # 3.20+

# Optional but recommended
brew install abseil  # For advanced utilities

CMakeLists.txt:

cmake_minimum_required(VERSION 3.20)
project(dtask)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(Protobuf REQUIRED)
find_package(gRPC CONFIG REQUIRED)

# Generate protobuf/gRPC code
add_custom_command(
    OUTPUT scheduler.pb.cc scheduler.pb.h scheduler.grpc.pb.cc scheduler.grpc.pb.h
    COMMAND protoc --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` scheduler.proto
    DEPENDS scheduler.proto
)

# Coordinator
add_executable(dtask-coordinator
    src/coordinator/main.cpp
    src/coordinator/coordinator.cpp
    src/coordinator/job_manager.cpp
    src/coordinator/scheduler.cpp
    src/coordinator/worker_registry.cpp
    scheduler.pb.cc
    scheduler.grpc.pb.cc
)
target_link_libraries(dtask-coordinator gRPC::grpc++ protobuf::libprotobuf pthread)

# Worker
add_executable(dtask-worker
    src/worker/main.cpp
    src/worker/worker.cpp
    src/worker/task_executor.cpp
    src/common/thread_pool.cpp
    scheduler.pb.cc
    scheduler.grpc.pb.cc
)
target_link_libraries(dtask-worker gRPC::grpc++ protobuf::libprotobuf pthread)

# CLI
add_executable(dtask
    src/cli/main.cpp
    src/cli/submit.cpp
    src/cli/status.cpp
    scheduler.pb.cc
    scheduler.grpc.pb.cc
)
target_link_libraries(dtask gRPC::grpc++ protobuf::libprotobuf)

# Tests
enable_testing()
add_executable(dtask-tests
    tests/dag_tests.cpp
    tests/scheduler_tests.cpp
    tests/integration_tests.cpp
)
target_link_libraries(dtask-tests gtest_main pthread)

Project Structure

dtask/
├── CMakeLists.txt
├── proto/
│   └── scheduler.proto           # gRPC service definition
├── src/
│   ├── coordinator/
│   │   ├── main.cpp              # Coordinator entry point
│   │   ├── coordinator.hpp       # Main coordinator class
│   │   ├── coordinator.cpp
│   │   ├── job_manager.hpp       # Job and task tracking
│   │   ├── job_manager.cpp
│   │   ├── scheduler.hpp         # Task scheduling logic
│   │   ├── scheduler.cpp
│   │   ├── worker_registry.hpp   # Worker tracking
│   │   └── worker_registry.cpp
│   ├── worker/
│   │   ├── main.cpp              # Worker entry point
│   │   ├── worker.hpp            # Main worker class
│   │   ├── worker.cpp
│   │   ├── task_executor.hpp     # Task execution
│   │   └── task_executor.cpp
│   ├── cli/
│   │   ├── main.cpp              # CLI entry point
│   │   ├── submit.cpp            # Job submission
│   │   └── status.cpp            # Status queries
│   └── common/
│       ├── thread_pool.hpp       # From Project 3
│       ├── thread_pool.cpp
│       ├── lock_free_queue.hpp   # From Project 6
│       ├── dag.hpp               # Task dependency graph
│       ├── dag.cpp
│       ├── types.hpp             # Shared types
│       └── logging.hpp           # Logging utilities
├── tests/
│   ├── dag_tests.cpp
│   ├── scheduler_tests.cpp
│   ├── failure_tests.cpp
│   └── integration_tests.cpp
├── examples/
│   ├── wordcount.yaml            # Example job definition
│   └── pipeline.yaml
└── README.md

The Core Question You’re Answering

“How do you coordinate task execution across multiple unreliable machines while respecting dependencies, handling failures, and maximizing throughput?”

This is the fundamental question of distributed computing. Your implementation must answer:

  1. How do you track what work needs to be done? (Job manager, DAG)
  2. How do you decide where to run it? (Scheduler, load balancer)
  3. How do you know if a worker is alive? (Heartbeats)
  4. What happens when things fail? (Retry logic, state machine)
  5. How do you get results back? (Aggregation, client notification)

Concepts You Must Understand First

Before starting implementation, verify you understand these concepts:

Concept Self-Check Question Book Reference
gRPC Can you explain the difference between unary and streaming RPCs? gRPC documentation
Protobuf Can you define a message with nested types and enums? Protocol Buffers docs
DAG Can you implement topological sort? CLRS Chapter 22
State machines Can you draw the state diagram for a task? Design Patterns
Thread pools Can you explain how work-stealing balances load? Project 3
Heartbeats Can you explain how to detect a failed node? Kleppmann Chapter 8
Retries Can you explain idempotency and why it matters? Kleppmann Chapter 8

Questions to Guide Your Design

Architecture Questions:

  1. Should task state live in the coordinator only, or be replicated?
  2. How should workers discover the coordinator? (Config file? DNS? Service discovery?)
  3. What happens if the coordinator crashes? (State persistence? Leader election?)

Scheduling Questions:

  1. Push model (coordinator sends tasks) or pull model (workers request tasks)?
  2. How do you handle “sticky” tasks that should run on specific workers?
  3. How do you prevent task starvation for low-priority jobs?

Failure Questions:

  1. How long should heartbeat timeout be? (Trade-off: fast detection vs false positives)
  2. What if a task has side effects that can’t be retried?
  3. How do you handle partial job failure?

Performance Questions:

  1. How do you batch small tasks to reduce RPC overhead?
  2. How do you minimize lock contention in the coordinator?
  3. How do you scale to 100+ workers?

Thinking Exercise

Before coding, trace through this scenario manually:

Job submission:
  Task A: no dependencies
  Task B: depends on A
  Task C: depends on A
  Task D: depends on B, C

Workers: W1 (2 slots), W2 (2 slots)

Trace the execution:
1. What tasks can run immediately?
2. Draw the coordinator's state after A completes on W1
3. What if W2 crashes while running C?
4. When does D become ready?
5. What is the total execution time if each task takes 1 second?

Draw the timeline:

Time: 0    1    2    3    4
W1:   [A]  [B]  [D]
W2:   [C]  [C]  ...
      ^    ^    ^
      |    |    |
      |    |    D can start (B,C done)
      |    B,C can start (A done)
      A starts (no deps)

Hints in Layers

Hint 1: Starting Point (Conceptual Direction)

Start with the simplest possible system that works:

  1. Single-node coordinator (no replication)
  2. Simple FIFO scheduling (no priorities)
  3. Tasks have no dependencies (parallel batch)
  4. No failure recovery yet

Get this working end-to-end, then add complexity.

Hint 2: Next Level (More Specific Guidance)

Implement in this order:

  1. gRPC skeleton: Define service, generate code, verify connectivity
  2. Worker registration: Workers can connect and report capacity
  3. Task submission: Coordinator accepts jobs with tasks
  4. Basic scheduling: Assign tasks to workers round-robin
  5. Result reporting: Workers report completion
  6. DAG support: Add dependency tracking
  7. Heartbeats: Detect worker failures
  8. Retry logic: Reschedule failed tasks

Hint 3: Technical Details (Approach/Pseudocode)

Coordinator main loop:

void Coordinator::run() {
    // Start gRPC server in background
    grpc_server_thread_ = std::thread([this] {
        grpc::ServerBuilder builder;
        builder.AddListeningPort(address_, grpc::InsecureServerCredentials());
        builder.RegisterService(this);
        auto server = builder.BuildAndStart();
        server->Wait();
    });

    // Start scheduler thread
    scheduler_thread_ = std::thread([this] {
        while (!shutdown_) {
            schedule_ready_tasks();
            std::this_thread::sleep_for(10ms);
        }
    });

    // Start heartbeat checker
    heartbeat_thread_ = std::thread([this] {
        while (!shutdown_) {
            check_worker_health();
            std::this_thread::sleep_for(1s);
        }
    });

    // Wait for shutdown
    shutdown_cv_.wait(...);
}

Worker main loop:

void Worker::run() {
    // Register with coordinator
    register_with_coordinator();

    // Start heartbeat thread
    heartbeat_thread_ = std::thread([this] {
        while (!shutdown_) {
            send_heartbeat();
            std::this_thread::sleep_for(1s);
        }
    });

    // Main task execution loop
    while (!shutdown_) {
        // Request task from coordinator
        auto task = request_task();
        if (task) {
            // Execute using thread pool
            auto future = thread_pool_->submit([this, task] {
                return execute_task(*task);
            });

            // Report result when done
            auto result = future.get();
            report_result(task->task_id, result);
        } else {
            // No tasks available, back off
            std::this_thread::sleep_for(100ms);
        }
    }
}

Hint 4: Tools and Debugging

Debugging distributed systems is HARD. Use these tools:

  1. Structured logging with request IDs:
    #define LOG(level, ...) log_impl(level, job_id, task_id, __VA_ARGS__)
    // Enables tracing a single task through the system
    
  2. State visualization:
    void Coordinator::dump_state() {
     for (const auto& [job_id, job] : jobs_) {
         std::cout << "Job " << job_id << ":\n";
         for (const auto& [task_id, task] : job.tasks) {
             std::cout << "  " << task_id << ": " << to_string(task.state);
             if (task.assigned_worker) {
                 std::cout << " on " << *task.assigned_worker;
             }
             std::cout << "\n";
         }
     }
    }
    
  3. Simulate failures in tests:
    class FaultInjector {
    public:
     void set_worker_crash_probability(double p);
     void set_network_delay_ms(int min, int max);
     void crash_worker(const std::string& id);
     void partition_network(const std::string& worker_id);
    };
    
  4. ThreadSanitizer for race detection:
    clang++ -fsanitize=thread -g -O1 ...
    

The Interview Questions They’ll Ask

After completing this project, be ready to answer:

  1. “How would you handle coordinator failure?”
    • Answer: State persistence to disk/database, or leader election with Raft for HA
  2. “What happens if two workers both think they’re executing the same task?”
    • Answer: Idempotency tokens, at-most-once delivery, or conflict resolution
  3. “How would you scale to 10,000 workers?”
    • Answer: Hierarchical scheduling, partitioned coordinators, pull-based task assignment
  4. “What if tasks take hours and heartbeats fail occasionally?”
    • Answer: Adaptive heartbeat intervals, task-level heartbeats, lease-based ownership
  5. “How would you add exactly-once semantics?”
    • Answer: Two-phase commit, saga pattern, or idempotent task design
  6. “Walk me through what happens when you submit a job with 100 dependent tasks.”
    • Answer: (Trace through your implementation step by step)
  7. “How do you prevent a slow task from blocking the entire job?”
    • Answer: Speculative execution, task splitting, timeout with retry on different worker
  8. “What’s the difference between this and Kubernetes?”
    • Answer: K8s schedules long-running containers; this schedules short-lived tasks with dependencies

Books That Will Help

Topic Book Chapters
Distributed Systems Fundamentals “Designing Data-Intensive Applications” by Martin Kleppmann Part II (entire)
Consensus and Replication “Designing Data-Intensive Applications” by Martin Kleppmann Chapters 5, 9
Failure Detection “Distributed Systems” by Tanenbaum Chapter 8
gRPC and Protobuf gRPC documentation Core concepts, C++ tutorial
MapReduce Pattern Original MapReduce paper (Dean & Ghemawat) Entire paper
Thread Pool Implementation “C++ Concurrency in Action” by Anthony Williams Chapter 9

Implementation Phases

Phase 1: Foundation (Week 1-2)

Goal: Basic coordinator-worker communication

Tasks:

  1. Define protobuf messages and gRPC service
  2. Implement coordinator gRPC server skeleton
  3. Implement worker registration
  4. Test: Worker can connect and register

Success criteria: ./dtask worker connects to ./dtask coordinator and appears in worker list

Phase 2: Basic Scheduling (Week 2-3)

Goal: Execute independent tasks

Tasks:

  1. Implement job submission (no dependencies yet)
  2. Implement simple round-robin scheduling
  3. Implement task execution in worker thread pool
  4. Implement result reporting

Success criteria: Submit 100 independent tasks, all complete successfully

Phase 3: Dependency Graph (Week 3-4)

Goal: Execute tasks with dependencies

Tasks:

  1. Implement DAG data structure
  2. Add dependency tracking to job manager
  3. Compute ready tasks based on completed dependencies
  4. Test topological execution order

Success criteria: Diamond dependency pattern (A -> B, A -> C, B+C -> D) executes correctly

Phase 4: Heartbeats and Failure Detection (Week 4-5)

Goal: Detect and handle worker failures

Tasks:

  1. Implement heartbeat protocol
  2. Implement heartbeat timeout detection
  3. Implement task reassignment on worker failure
  4. Test failure scenarios

Success criteria: Kill a worker mid-execution, tasks get rescheduled and complete

Phase 5: Retry Logic and Timeouts (Week 5-6)

Goal: Handle task-level failures

Tasks:

  1. Implement task timeout detection
  2. Implement retry logic with backoff
  3. Implement max retry limits
  4. Handle permanently failed tasks

Success criteria: Simulate task failures, retries happen, eventually succeeds or fails cleanly

Phase 6: Result Aggregation (Week 6-7)

Goal: Collect and return job results

Tasks:

  1. Implement result storage in coordinator
  2. Implement result aggregation for completed jobs
  3. Implement client notification
  4. Test full job lifecycle

Success criteria: Submit MapReduce-style job, get aggregated result

Phase 7: Polish and Performance (Week 7-10)

Goal: Production-quality system

Tasks:

  1. Add comprehensive logging
  2. Add metrics (task throughput, latency percentiles)
  3. Optimize hot paths
  4. Stress testing with many workers and tasks
  5. Documentation

Success criteria: 10+ workers, 10,000+ tasks, <100ms P99 scheduling latency

Key Implementation Decisions

Decision 1: Push vs Pull Model

PUSH Model:
  Coordinator actively sends tasks to workers
  + Coordinator has full control
  + Lower latency for new tasks
  - Requires tracking worker capacity accurately
  - More complex failure handling

PULL Model:
  Workers request tasks when idle
  + Simpler coordinator logic
  + Natural load balancing
  + Workers control their own pace
  - Potentially higher latency (poll interval)
  - Empty polls waste resources

Recommendation: Start with PULL for simplicity

Decision 2: Synchronous vs Async gRPC

Synchronous:
  - Simpler to implement
  - Each RPC blocks a thread
  - Limited scalability

Asynchronous:
  - More complex (completion queues)
  - Single thread handles many connections
  - Better scalability

Recommendation: Start synchronous, optimize later if needed

Decision 3: State Persistence

In-Memory Only:
  - Simple, fast
  - State lost on coordinator crash
  - Good for development/testing

Persistent (SQLite/RocksDB):
  - State survives restarts
  - More complex
  - Required for production

Recommendation: Start in-memory, add persistence in Phase 7

Testing Strategy

Unit Tests

// DAG tests
TEST(TaskDAG, ReadyTasksWithNoDeps) {
    TaskDAG dag;
    dag.add_task("A", {});
    dag.add_task("B", {});
    dag.add_task("C", {});

    auto ready = dag.get_ready_tasks();
    EXPECT_EQ(ready.size(), 3);
}

TEST(TaskDAG, ReadyTasksWithDeps) {
    TaskDAG dag;
    dag.add_task("A", {});
    dag.add_task("B", {"A"});
    dag.add_task("C", {"A"});
    dag.add_task("D", {"B", "C"});

    // Initially only A is ready
    auto ready = dag.get_ready_tasks();
    EXPECT_EQ(ready.size(), 1);
    EXPECT_EQ(ready[0], "A");

    // After A completes, B and C become ready
    dag.complete_task("A");
    ready = dag.get_ready_tasks();
    EXPECT_EQ(ready.size(), 2);

    // After B and C complete, D becomes ready
    dag.complete_task("B");
    dag.complete_task("C");
    ready = dag.get_ready_tasks();
    EXPECT_EQ(ready.size(), 1);
    EXPECT_EQ(ready[0], "D");
}

TEST(TaskDAG, CycleDetection) {
    TaskDAG dag;
    dag.add_task("A", {"B"});
    dag.add_task("B", {"A"});

    EXPECT_TRUE(dag.has_cycles());
}

// Scheduler tests
TEST(Scheduler, LeastLoadedWorkerSelection) {
    WorkerLoadBalancer lb;
    lb.add_worker("w1", 4);
    lb.add_worker("w2", 8);
    lb.set_strategy(Strategy::LeastLoaded);

    // w2 has more capacity
    lb.update_capacity("w1", 2);
    lb.update_capacity("w2", 6);

    auto selected = lb.select_worker(Task{});
    EXPECT_EQ(selected, "w2");
}

// State machine tests
TEST(TaskStateMachine, ValidTransitions) {
    Task task;
    task.state = TaskState::PENDING;

    EXPECT_TRUE(task.can_transition_to(TaskState::ASSIGNED));
    EXPECT_FALSE(task.can_transition_to(TaskState::COMPLETED));
}

Integration Tests

TEST(Integration, BasicJobExecution) {
    // Start coordinator in background
    auto coordinator = std::make_unique<Coordinator>("localhost:8080");
    std::thread coord_thread([&] { coordinator->run(); });

    // Start worker
    auto worker = std::make_unique<Worker>("localhost:8080", 4, "test-worker");
    std::thread worker_thread([&] { worker->run(); });

    // Wait for registration
    std::this_thread::sleep_for(100ms);

    // Submit job
    JobSubmission job;
    job.set_name("test-job");
    auto* task = job.add_tasks();
    task->set_id("task-1");
    task->set_type("echo");

    auto client = CreateClient("localhost:8080");
    auto response = client->SubmitJob(job);
    EXPECT_TRUE(response.ok());

    // Wait for completion
    std::this_thread::sleep_for(1s);

    // Check status
    auto status = client->GetJobStatus(response.job_id());
    EXPECT_EQ(status.state(), JobState::COMPLETED);

    // Cleanup
    coordinator->shutdown();
    worker->shutdown();
    coord_thread.join();
    worker_thread.join();
}

TEST(Integration, WorkerFailureRecovery) {
    auto coordinator = start_coordinator();

    // Start two workers
    auto worker1 = start_worker("worker-1");
    auto worker2 = start_worker("worker-2");

    // Submit job with long-running tasks
    auto job = submit_long_running_job(10);  // 10 tasks, each takes 5s

    // Wait for tasks to start
    std::this_thread::sleep_for(500ms);

    // Kill worker1
    worker1->crash();

    // Wait for job completion
    wait_for_job_completion(job.id(), 60s);

    // Verify all tasks completed (some retried on worker2)
    auto status = get_job_status(job.id());
    EXPECT_EQ(status.state(), JobState::COMPLETED);
    EXPECT_GE(status.retry_count(), 1);  // At least one retry

    cleanup();
}

TEST(Integration, DAGExecution) {
    auto coordinator = start_coordinator();
    auto worker = start_worker("worker-1");

    // Submit diamond DAG: A -> B, A -> C, B+C -> D
    JobSubmission job;
    add_task(job, "A", {});
    add_task(job, "B", {"A"});
    add_task(job, "C", {"A"});
    add_task(job, "D", {"B", "C"});

    auto response = submit_job(job);

    // Track execution order
    std::vector<std::string> execution_order;

    wait_for_job_completion(response.job_id(), 10s);

    // Verify order respects dependencies
    auto status = get_job_status(response.job_id());
    EXPECT_TRUE(completed_before(status, "A", "B"));
    EXPECT_TRUE(completed_before(status, "A", "C"));
    EXPECT_TRUE(completed_before(status, "B", "D"));
    EXPECT_TRUE(completed_before(status, "C", "D"));
}

Stress Tests

TEST(Stress, ManyTasks) {
    auto coordinator = start_coordinator();

    // Start 10 workers with 8 threads each
    std::vector<std::unique_ptr<Worker>> workers;
    for (int i = 0; i < 10; ++i) {
        workers.push_back(start_worker("worker-" + std::to_string(i)));
    }

    // Submit 10,000 independent tasks
    JobSubmission job;
    for (int i = 0; i < 10000; ++i) {
        add_task(job, "task-" + std::to_string(i), {});
    }

    auto start = std::chrono::steady_clock::now();
    auto response = submit_job(job);
    wait_for_job_completion(response.job_id(), 60s);
    auto end = std::chrono::steady_clock::now();

    auto status = get_job_status(response.job_id());
    EXPECT_EQ(status.state(), JobState::COMPLETED);

    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
    std::cout << "10,000 tasks completed in " << duration.count() << "ms\n";
    std::cout << "Throughput: " << 10000.0 / (duration.count() / 1000.0) << " tasks/sec\n";

    // Should complete in reasonable time with 80 threads
    EXPECT_LT(duration.count(), 30000);  // 30 seconds max
}

TEST(Stress, ChaosMonkey) {
    // Run for 60 seconds with random failures
    auto coordinator = start_coordinator();

    // Start/stop workers randomly
    std::atomic<bool> stop{false};
    std::thread chaos_thread([&] {
        while (!stop) {
            auto action = random_action();
            switch (action) {
                case Action::StartWorker:
                    start_random_worker();
                    break;
                case Action::KillWorker:
                    kill_random_worker();
                    break;
                case Action::NetworkPartition:
                    partition_random_worker();
                    break;
            }
            std::this_thread::sleep_for(random_interval(100ms, 1s));
        }
    });

    // Submit jobs continuously
    std::atomic<int> completed_jobs{0};
    std::atomic<int> failed_jobs{0};

    for (int i = 0; i < 100; ++i) {
        auto response = submit_random_job();
        auto status = wait_for_job(response.job_id(), 30s);
        if (status.state() == JobState::COMPLETED) {
            completed_jobs++;
        } else {
            failed_jobs++;
        }
    }

    stop = true;
    chaos_thread.join();

    std::cout << "Completed: " << completed_jobs << ", Failed: " << failed_jobs << "\n";

    // Most jobs should complete despite chaos
    EXPECT_GT(completed_jobs.load(), 80);
}

Common Pitfalls & Debugging

Pitfall 1: Race Condition in Task Assignment

Symptom: Same task assigned to multiple workers, or task state corrupted.

Cause: Multiple scheduler threads or RPCs modifying task state concurrently.

Solution:

// Use mutex for task state modifications
std::unique_lock lock(task_mutex_);
if (task.state == TaskState::PENDING) {
    task.state = TaskState::ASSIGNED;
    task.assigned_worker = worker_id;
    task.assigned_at = now();
}
// Release lock before network call
lock.unlock();
send_assignment_to_worker(...);

Pitfall 2: Heartbeat False Positives

Symptom: Workers marked as dead even though they’re running fine.

Cause: Network delay or coordinator overload causing missed heartbeat processing.

Solution:

// Use multiple missed heartbeats before declaring dead
const int MISS_THRESHOLD = 3;

void check_worker_health() {
    for (auto& [id, worker] : workers_) {
        auto elapsed = now() - worker.last_heartbeat;
        if (elapsed > heartbeat_interval_ * MISS_THRESHOLD) {
            // Missed 3 heartbeats, consider dead
            handle_worker_failure(id);
        }
    }
}

Pitfall 3: Task Stuck in RUNNING Forever

Symptom: Task shows as RUNNING but worker is dead.

Cause: Worker crashed after acknowledging task but before completing.

Solution:

// Check both heartbeat AND task timeout
void check_running_tasks() {
    for (auto& [job_id, job] : jobs_) {
        for (auto& [task_id, task] : job.tasks) {
            if (task.state == TaskState::RUNNING) {
                auto elapsed = now() - task.assigned_at;
                if (elapsed > task_timeout_) {
                    // Task exceeded timeout
                    LOG("Task {} timed out", task_id);
                    reschedule_task(task);
                }
            }
        }
    }
}

Pitfall 4: Deadlock in Dependency Resolution

Symptom: Job never completes even though no tasks are running.

Cause: Circular dependency or dependency on non-existent task.

Solution:

// Validate DAG on job submission
bool validate_job(const Job& job) {
    // Check for cycles
    if (job.dag.has_cycles()) {
        return false;
    }

    // Check all dependencies exist
    for (const auto& [task_id, task] : job.tasks) {
        for (const auto& dep : task.dependencies) {
            if (job.tasks.find(dep) == job.tasks.end()) {
                LOG("Task {} depends on non-existent task {}", task_id, dep);
                return false;
            }
        }
    }

    return true;
}

Pitfall 5: Memory Leak from Unfinished Jobs

Symptom: Coordinator memory grows over time.

Cause: Completed/failed jobs not being cleaned up.

Solution:

// Clean up old jobs periodically
void cleanup_old_jobs() {
    auto cutoff = now() - retention_period_;

    std::unique_lock lock(jobs_mutex_);
    for (auto it = jobs_.begin(); it != jobs_.end(); ) {
        if (it->second.completed_at < cutoff &&
            (it->second.state == JobState::COMPLETED ||
             it->second.state == JobState::FAILED)) {
            it = jobs_.erase(it);
        } else {
            ++it;
        }
    }
}

Pitfall 6: Thundering Herd on Coordinator Restart

Symptom: Coordinator overwhelmed when restarting with many workers.

Cause: All workers try to re-register simultaneously.

Solution:

// Workers: Jittered reconnection
void Worker::reconnect() {
    auto jitter = random_interval(0ms, 5000ms);
    std::this_thread::sleep_for(jitter);
    connect_to_coordinator();
}

// Coordinator: Rate limit registrations
class RegistrationLimiter {
    std::atomic<int> pending_{0};
    static constexpr int MAX_CONCURRENT = 10;

public:
    bool try_acquire() {
        int expected = pending_.load();
        while (expected < MAX_CONCURRENT) {
            if (pending_.compare_exchange_weak(expected, expected + 1)) {
                return true;
            }
        }
        return false;  // Reject, try again later
    }

    void release() { pending_.fetch_sub(1); }
};

Extensions & Challenges

Extension 1: Priority Scheduling

Add task priorities with preemption:

enum class Priority { Low, Normal, High, Critical };

// Priority queue for ready tasks
struct PriorityComparator {
    bool operator()(const Task& a, const Task& b) {
        return a.priority < b.priority;  // Higher priority first
    }
};

std::priority_queue<Task, std::vector<Task>, PriorityComparator> ready_queue_;

Extension 2: Work Stealing Across Workers

When one worker is idle while another is overloaded:

// Worker requests task from coordinator
// If no tasks from coordinator, coordinator tells worker to steal from peer
message StealRequest {
    string worker_id = 1;
}

message StealResponse {
    oneof result {
        TaskAssignment task = 1;
        string steal_from_worker = 2;  // Address of peer to steal from
        bool no_work_available = 3;
    }
}

Extension 3: Coordinator High Availability

Implement leader election with Raft:

class RaftCoordinator : public Coordinator {
    // Raft state
    enum class State { Follower, Candidate, Leader };
    State state_ = State::Follower;
    int current_term_ = 0;
    std::optional<std::string> voted_for_;
    std::vector<LogEntry> log_;

    // Replicate all state changes through Raft
    void replicate_state_change(const StateChange& change);

    // Election
    void start_election();
    void become_leader();
    void become_follower(int term);
};

Extension 4: Speculative Execution

Run slow tasks on multiple workers:

// If task is taking too long, speculatively run on another worker
void check_for_speculation() {
    for (const auto& task : running_tasks_) {
        auto elapsed = now() - task.started_at;
        auto expected = estimate_task_duration(task);

        if (elapsed > expected * SPECULATION_THRESHOLD) {
            // Task is slow, start speculative copy
            assign_speculative_task(task);
        }
    }
}

// When any copy completes, cancel the others
void on_task_complete(const std::string& task_id) {
    cancel_speculative_copies(task_id);
    // ... rest of completion logic
}

Extension 5: Resource Constraints

Schedule based on memory/CPU requirements:

struct TaskRequirements {
    int cpu_cores = 1;
    size_t memory_mb = 512;
    size_t disk_mb = 0;
    std::vector<std::string> labels;  // e.g., "gpu", "ssd"
};

struct WorkerResources {
    int total_cores;
    int available_cores;
    size_t total_memory_mb;
    size_t available_memory_mb;
    std::set<std::string> labels;
};

bool can_schedule(const Task& task, const Worker& worker) {
    return task.requirements.cpu_cores <= worker.available_cores &&
           task.requirements.memory_mb <= worker.available_memory_mb &&
           has_all_labels(task.requirements.labels, worker.labels);
}

Challenge: Exactly-Once Execution

Guarantee each task runs exactly once, even with retries:

// Use idempotency keys
struct Task {
    std::string id;
    std::string idempotency_key;  // Unique per execution attempt
    // ...
};

// Workers check if result already exists
void execute_task(const Task& task) {
    // Check result cache
    if (auto result = result_cache_.get(task.idempotency_key)) {
        // Already executed, return cached result
        return *result;
    }

    // Execute
    auto result = run_task(task);

    // Cache result atomically
    result_cache_.put(task.idempotency_key, result);

    return result;
}

Real-World Connections

How Production Systems Solve This

System Architecture Key Innovation
Kubernetes Controller pattern with etcd Declarative state reconciliation
Apache YARN ResourceManager + NodeManager Container-based isolation
Mesos Two-level scheduling Resource offers
Temporal Event sourcing Durable execution, replay
Airflow DAG-based scheduling Declarative workflows
Spark Driver + Executors Resilient distributed datasets

Industry Practices

  1. Observability: Distributed tracing (Jaeger), metrics (Prometheus), logging (ELK)
  2. Configuration: Service discovery (Consul), config management (etcd)
  3. Communication: gRPC for internal, REST for external
  4. Testing: Chaos engineering (Chaos Monkey), contract testing
  5. Deployment: Blue-green, canary releases

Where This Project Leads

  • Platform Engineering: Build internal developer platforms
  • Distributed Systems Engineer: Design scalable systems
  • SRE/Infrastructure: Operate and improve distributed systems
  • Technical interviews: Strong distributed systems fundamentals

Resources

Essential Reading

  1. “Designing Data-Intensive Applications” by Martin Kleppmann
    • Chapters 5, 8, 9: Replication, fault tolerance, consistency
  2. “MapReduce: Simplified Data Processing on Large Clusters” (Dean & Ghemawat)
    • Original paper on distributed batch processing
  3. “In Search of an Understandable Consensus Algorithm” (Raft paper)
    • Diego Ongaro, John Ousterhout
  4. gRPC Documentation
    • C++ quickstart, concepts, best practices

Code to Study

Project Language Notes
Ray Python/C++ Distributed computing framework
Temporal Go Workflow orchestration
Dask Python Distributed computing scheduler
etcd Go Distributed key-value store
grpc C++ gRPC reference implementation

Tools

  • gRPC: Service communication
  • Protobuf: Serialization
  • Prometheus: Metrics collection
  • Grafana: Metrics visualization
  • Jaeger: Distributed tracing
  • ThreadSanitizer: Race detection

Self-Assessment Checklist

Understanding

  • I can explain the difference between coordinator and worker responsibilities
  • I can describe the task state machine and all transitions
  • I can explain how DAG dependencies affect execution order
  • I can describe what happens when a worker fails mid-task
  • I understand the trade-offs between push and pull scheduling models
  • I can explain why heartbeats are necessary and their limitations
  • I understand idempotency and why it matters for retries

Implementation

  • Coordinator accepts gRPC connections from workers
  • Workers register and report capacity
  • Tasks are assigned to workers based on availability
  • Task results are reported back to coordinator
  • DAG dependencies are respected in execution order
  • Heartbeat timeout correctly detects worker failures
  • Failed tasks are rescheduled on different workers
  • Jobs complete successfully when all tasks finish

Performance

  • System handles 10+ workers simultaneously
  • Task scheduling latency is < 100ms P99
  • System recovers from worker failure in < 5 seconds
  • No memory leaks during extended operation
  • ThreadSanitizer reports no races

Testing

  • Unit tests for DAG, scheduler, state machine
  • Integration tests for happy path
  • Failure tests (worker crash, timeout)
  • Stress tests with many tasks
  • Chaos testing with random failures

Submission / Completion Criteria

Minimum Viable Completion

  • Coordinator and worker processes start and communicate
  • Workers register with coordinator
  • Tasks are submitted and executed on workers
  • Results are reported back to coordinator
  • Simple jobs complete successfully

Full Completion

  • Task dependencies (DAG) work correctly
  • Heartbeat-based failure detection
  • Task reassignment on worker failure
  • Retry logic with configurable limits
  • Task timeouts
  • Result aggregation
  • CLI for submission and status
  • 10+ workers, 1000+ tasks stress test passes

Excellence (Going Above & Beyond)

  • Priority scheduling
  • Work stealing across workers
  • Speculative execution
  • Metrics and monitoring
  • Coordinator persistence/recovery
  • Comprehensive chaos testing
  • Documentation and diagrams
  • Performance optimization (>10K tasks/sec throughput)

Building a distributed task scheduler teaches you the core challenges of distributed computing: coordination, failure handling, and consistency. After this project, you’ll understand how systems like Kubernetes, Airflow, and Spark work under the hood, and you’ll be prepared for infrastructure/platform engineering roles at any scale.


This guide was expanded from LEARN_CPP_CONCURRENCY_AND_PARALLELISM.md. For the complete learning path, see the project index.