Project 16: Distributed Task Scheduler
Build a distributed task execution system where tasks can be submitted to any node, executed on any node with available capacity, with task dependencies, result aggregation, and failure recovery. This is the capstone project that combines all concurrency concepts: thread pools for local execution, async I/O for network communication, lock-free structures for high performance, and coordination primitives for distributed consensus.
Quick Reference
| Attribute | Value |
|---|---|
| Project Number | 16 of 17 |
| Category | C++ Concurrency Mastery |
| Difficulty | Level 5: Master |
| Time Estimate | 6-10 weeks |
| Main Programming Language | C++ |
| Alternative Languages | Go, Rust, Java |
| Coolness Level | Level 5: Pure Magic (Super Cool) |
| Business Potential | 5. The “Industry Disruptor” |
| Knowledge Area | Distributed Systems / Concurrency |
| Primary Tools | gRPC, Protobuf, C++20, ThreadSanitizer |
| Main Book | “Designing Data-Intensive Applications” by Martin Kleppmann |
Summary: Build a production-quality distributed task scheduler with coordinator-worker architecture, task dependency graphs (DAG execution), work stealing across network boundaries, failure recovery with retries and timeouts, and result aggregation. This project integrates every concurrency concept from the previous 15 projects into a cohesive distributed system.
Learning Objectives
By completing this project, you will be able to:
- Design distributed system architecture with coordinator-worker topology, clear separation of concerns, and well-defined communication protocols
- Implement network communication using gRPC or raw sockets with proper serialization, connection management, and error handling
- Build task dependency graphs (DAGs) with topological sorting, dependency resolution, and parallel execution of independent tasks
- Handle distributed failures including network partitions, worker crashes, task timeouts, and partial failures with retry logic
- Coordinate distributed state using leader election, heartbeats, and consensus-lite protocols
- Aggregate results from distributed workers using map-reduce patterns and streaming aggregation
- Apply all previous concurrency concepts: thread pools (P03), lock-free queues (P06), async I/O (P11), and actors (P15) in a real distributed context
- Answer interview questions about distributed systems, consensus, fault tolerance, and large-scale system design
Theoretical Foundation
Core Concepts
The Distributed Task Scheduling Problem
You have N worker machines. You want to execute M tasks (where M » N) as efficiently as possible. Tasks may have dependencies (task B requires output of task A). Workers may fail. Network may partition. How do you coordinate this?
THE PROBLEM SPACE
+------------------+ +------------------+
| Task Graph | | Worker Pool |
| | | |
| A ──► B | | Worker-1 (4c) |
| │ │ | ? | Worker-2 (8c) |
| ▼ ▼ | ────► | Worker-3 (4c) |
| C ──► D | | Worker-4 (8c) |
| │ | | |
| ▼ | | (Workers may |
| E | | fail at any |
| | | time!) |
+------------------+ +------------------+
Questions you must answer:
1. Which tasks can run in parallel? (A and... nothing initially. Then C when A completes)
2. Which worker runs each task? (Load balancing)
3. What happens if Worker-2 crashes while running B? (Failure recovery)
4. How do results flow back and aggregate? (Data movement)
5. How does the coordinator know worker health? (Heartbeats)
Coordinator-Worker Architecture
The classic architecture for distributed task execution:
COORDINATOR-WORKER ARCHITECTURE
┌─────────────────────┐
│ │
│ COORDINATOR │
│ │
│ ┌──────────────┐ │
Task Submission ───────► │ Task Queue │ │
│ │ [T1,T2,T3..] │ │
│ └──────────────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │ Scheduler │ │
│ │ │ │
│ │ • DAG state │ │
│ │ • Worker map │ │
│ │ • Assignments│ │
│ └──────────────┘ │
│ │ │
└─────────┼───────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌────────▼────────┐ ┌───────▼────────┐ ┌───────▼────────┐
│ WORKER-1 │ │ WORKER-2 │ │ WORKER-3 │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌────────────┐ │ │ ┌────────────┐ │
│ │ ThreadPool │ │ │ │ ThreadPool │ │ │ │ ThreadPool │ │
│ │ (4 threads) │ │ │ │ (8 threads)│ │ │ │ (4 threads)│ │
│ └─────────────┘ │ │ └────────────┘ │ │ └────────────┘ │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌────────────┐ │ │ ┌────────────┐ │
│ │ Task Queue │ │ │ │ Task Queue │ │ │ │ Task Queue │ │
│ └─────────────┘ │ │ └────────────┘ │ │ └────────────┘ │
│ │ │ │ │ │
│ Heartbeat ──────┼─┼────────────────┼─┼────────────────┤
│ │ │ │ │ │
└─────────────────┘ └────────────────┘ └────────────────┘
Communication:
─────► Task assignment (coordinator → worker)
◄───── Result/status (worker → coordinator)
───────── Heartbeat (worker → coordinator, periodic)
Coordinator responsibilities:
- Accept task submissions from clients
- Track task state (pending, assigned, running, completed, failed)
- Manage task dependency graph (DAG)
- Assign tasks to workers based on capacity and locality
- Handle task failures (reassignment, retries)
- Aggregate results
Worker responsibilities:
- Register with coordinator on startup
- Report capacity (number of execution slots)
- Pull/receive tasks from coordinator
- Execute tasks using local thread pool
- Report results and failures
- Send periodic heartbeats
Task State Machine
Every task follows a well-defined state machine:
TASK STATE MACHINE
┌─────────────────────────────────────────────────────────────────┐
│ │
│ PENDING ────────────► ASSIGNED ────────────► RUNNING │
│ │ │ │ │
│ │ │ │ │
│ │ (worker crash) │ │ │
│ ◄──────────────────────┘ │ │
│ │ │ │
│ │ (timeout) │ │
│ ◄─────────────────────────────────────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ COMPLETED │
│ │ │ │
│ │ │ │
│ │ ┌──────────────────────┘ │
│ │ │ │
│ │ ▼ │
│ │ AGGREGATED │
│ │ │
│ │ │
│ │ ┌──────────┐ │
│ │ │ FAILED │ │
│ └─── (max retries) ─┤ │ │
│ │ terminal │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
State Transitions:
PENDING → ASSIGNED : Scheduler assigns task to worker
ASSIGNED → RUNNING : Worker acknowledges and starts execution
RUNNING → COMPLETED : Worker reports success with result
RUNNING → PENDING : Timeout (reschedule to different worker)
ASSIGNED → PENDING : Worker crash detected (heartbeat timeout)
* → FAILED : Max retries exceeded
COMPLETED → AGGREGATED: Result incorporated into job output
Dependency Graph (DAG) Execution
Tasks form a Directed Acyclic Graph (DAG) based on their dependencies:
DAG EXECUTION MODEL
Job Definition:
┌─────────────────────────────────────────────────────────────┐
│ Task A: Download file │
│ Task B: Parse JSON (depends on A) │
│ Task C: Validate schema (depends on A) │
│ Task D: Transform data (depends on B, C) │
│ Task E: Upload result (depends on D) │
└─────────────────────────────────────────────────────────────┘
Visual Representation:
┌───────┐
│ A │ Level 0 (no dependencies)
└───┬───┘
│
┌───────────┴───────────┐
│ │
┌───▼───┐ ┌───▼───┐
│ B │ │ C │ Level 1 (depend on A)
└───┬───┘ └───┬───┘
│ │
└───────────┬───────────┘
│
┌───▼───┐
│ D │ Level 2 (depends on B and C)
└───┬───┘
│
┌───▼───┐
│ E │ Level 3 (depends on D)
└───────┘
Execution Timeline:
──────────────────────────────────────────────────────────────►
Time 0 1 2 3 4 5
A [======]
B [=====]
C [===]
D [========]
E [====]
Note: B and C run in parallel because they only depend on A
(they don't depend on each other)
Key DAG operations:
- Topological sort: Determine execution order respecting dependencies
- Ready set computation: Find tasks whose dependencies are all complete
- Fan-out tracking: When task completes, update all dependents
- Critical path analysis: Identify the longest dependency chain
Why This Matters
Distributed task scheduling is the foundation of:
- MapReduce / Spark: Hadoop YARN, Spark scheduler
- CI/CD systems: GitHub Actions, Jenkins, CircleCI
- Workflow orchestration: Airflow, Prefect, Temporal
- Container orchestration: Kubernetes scheduler
- Distributed builds: Bazel remote execution, distcc
- Machine learning: Distributed training, hyperparameter search
Understanding this architecture lets you:
- Design scalable batch processing systems
- Build fault-tolerant job execution platforms
- Reason about distributed system failures
- Interview for infrastructure/platform roles
Historical Context
1990s: First distributed job schedulers (Condor, PBS) for HPC clusters 2004: MapReduce paper from Google defines modern distributed batch processing 2006: Hadoop open-sources MapReduce concepts 2010s: Container orchestration (Kubernetes) and workflow systems (Airflow) 2020s: Serverless task execution, edge computing coordination
Common Misconceptions
Misconception 1: “Just use a message queue” Reality: A message queue handles task delivery but not dependency graphs, result aggregation, failure recovery, or scheduling decisions. You need orchestration logic on top.
Misconception 2: “Distributed = just more threads” Reality: Network failures, partial failures, and ordering problems make distributed systems fundamentally different from concurrent programs.
Misconception 3: “Consensus is required for everything” Reality: Full consensus (Raft/Paxos) is expensive. Many systems use weaker consistency with application-level conflict resolution.
Misconception 4: “Retries solve all failures” Reality: Idempotency, exactly-once semantics, and side-effect management are hard problems that retries alone don’t solve.
Project Specification
What You Will Build
A distributed task scheduler with the following components:
- Coordinator Node (
dtask coordinator)- Accepts task/job submissions via gRPC
- Manages task state machine and dependency graph
- Schedules tasks to workers based on capacity
- Tracks worker health via heartbeats
- Handles failure recovery and retries
- Aggregates results
- Worker Node (
dtask worker)- Registers with coordinator on startup
- Runs local thread pool (from Project 3)
- Executes assigned tasks
- Reports completion, failure, and progress
- Sends heartbeats to coordinator
- Client CLI (
dtask submit,dtask status)- Submits jobs with task definitions and dependencies
- Queries job/task status
- Retrieves results
Functional Requirements
| ID | Requirement | Priority |
|---|---|---|
| FR1 | Submit jobs with multiple tasks and dependencies | Must |
| FR2 | Execute tasks on workers with available capacity | Must |
| FR3 | Track task state transitions correctly | Must |
| FR4 | Handle worker crashes with task reassignment | Must |
| FR5 | Respect task dependencies (DAG execution) | Must |
| FR6 | Aggregate results from completed tasks | Must |
| FR7 | Support task timeouts with configurable limits | Must |
| FR8 | Retry failed tasks with configurable retry count | Must |
| FR9 | Work stealing across workers (optional) | Should |
| FR10 | Support task priorities | Should |
| FR11 | Support task cancellation | Should |
| FR12 | Graceful coordinator failover (bonus) | Could |
Non-Functional Requirements
| Metric | Target | Description |
|---|---|---|
| Task Throughput | > 10,000/sec | Tasks scheduled per second |
| Task Latency (P50) | < 10ms | Time from ready to running |
| Task Latency (P99) | < 100ms | 99th percentile scheduling latency |
| Worker Recovery | < 5s | Time to detect failed worker and reschedule |
| Coordinator Recovery | < 30s | Time to recover coordinator state (bonus) |
| Scalability | 100+ workers | System should scale linearly |
| Memory Overhead | < 1KB/task | Per-task memory in coordinator |
Example Usage/Output
Terminal 1: Start Coordinator
$ ./dtask coordinator --port 8080 --heartbeat-interval 1s --task-timeout 60s
[Coordinator] Starting on port 8080
[Coordinator] Configuration:
Heartbeat interval: 1s
Task timeout: 60s
Max retries: 3
[Coordinator] Waiting for workers...
Terminal 2-4: Start Workers
$ ./dtask worker --coordinator localhost:8080 --threads 4 --name worker-1
[worker-1] Connecting to coordinator at localhost:8080...
[worker-1] Connected! Advertising 4 execution slots
[worker-1] Waiting for tasks...
$ ./dtask worker --coordinator localhost:8080 --threads 8 --name worker-2
[worker-2] Connected! Advertising 8 execution slots
$ ./dtask worker --coordinator localhost:8080 --threads 4 --name worker-3
[worker-3] Connected! Advertising 4 execution slots
Terminal 5: Submit Job
$ cat wordcount.yaml
job:
name: wordcount-example
tasks:
- id: map-001
type: map
input: s3://bucket/input/chunk-001.txt
output: /tmp/map-001.json
- id: map-002
type: map
input: s3://bucket/input/chunk-002.txt
output: /tmp/map-002.json
- id: map-003
type: map
input: s3://bucket/input/chunk-003.txt
output: /tmp/map-003.json
- id: reduce-001
type: reduce
dependencies: [map-001, map-002, map-003]
inputs: [/tmp/map-001.json, /tmp/map-002.json, /tmp/map-003.json]
output: /tmp/final-result.json
$ ./dtask submit --coordinator localhost:8080 --job wordcount.yaml
Submitting job: wordcount-example
Tasks: 4 (3 map + 1 reduce)
Dependencies: reduce-001 depends on [map-001, map-002, map-003]
[Coordinator] Job accepted, ID: job-20240115-001
[Coordinator] Scheduling 3 map tasks (no dependencies)...
Coordinator Output:
[Coordinator] Job job-20240115-001 accepted
[Coordinator] Task DAG analysis:
Level 0: map-001, map-002, map-003 (can run immediately)
Level 1: reduce-001 (depends on 3 tasks)
[Coordinator] Assigning map-001 to worker-1 (3/4 slots available)
[Coordinator] Assigning map-002 to worker-2 (7/8 slots available)
[Coordinator] Assigning map-003 to worker-3 (3/4 slots available)
[worker-1] Executing map-001...
[worker-2] Executing map-002...
[worker-3] Executing map-003...
[Coordinator] map-002 completed on worker-2 (1.2s)
[Coordinator] map-003 completed on worker-3 (1.4s)
[Coordinator] map-001 completed on worker-1 (1.8s)
[Coordinator] All dependencies for reduce-001 satisfied
[Coordinator] Assigning reduce-001 to worker-2 (8/8 slots, most capacity)
[worker-2] Executing reduce-001...
[Coordinator] reduce-001 completed on worker-2 (0.8s)
[Coordinator] Job job-20240115-001 complete!
Job Summary:
Total time: 2.6s
Tasks executed: 4
Task failures: 0
Worker utilization:
worker-1: 1 task (map-001)
worker-2: 2 tasks (map-002, reduce-001)
worker-3: 1 task (map-003)
Result: /tmp/final-result.json
Failure Scenario:
[worker-1] Executing map-001...
[worker-1] CRASHED!
[Coordinator] Heartbeat timeout from worker-1 (>3s)
[Coordinator] Marking worker-1 as DEAD
[Coordinator] Reassigning map-001 to worker-3
[worker-3] Executing map-001...
[Coordinator] map-001 completed on worker-3 (1.8s, retry #1)
[Coordinator] Job complete despite worker failure!
Task failures: 1 (recovered)
Real World Outcome
After implementing this project, you will have built a system similar to:
- Apache Mesos: Resource manager and task scheduler
- Kubernetes Job controller: Batch workload scheduling
- Celery: Distributed task queue for Python
- Dask distributed: Parallel computing scheduler
- Temporal/Cadence: Workflow orchestration engine
Your implementation will demonstrate:
- Distributed systems design skills
- Fault tolerance and recovery patterns
- State machine design
- Network programming with gRPC
- Integration of all concurrency concepts from previous projects
Solution Architecture
High-Level Design
┌─────────────────────────────────────────────────────────────────────────────┐
│ DISTRIBUTED TASK SCHEDULER │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ COORDINATOR │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ gRPC Server │ │ Job Manager │ │ Worker │ │ │
│ │ │ │ │ │ │ Registry │ │ │
│ │ │ • Submit() │ │ • Jobs map │ │ │ │ │
│ │ │ • Status() │ │ • Task DAG │ │ • Workers │ │ │
│ │ │ • Register() │ │ • State │ │ • Capacity │ │ │
│ │ │ • Heartbeat()│ │ machine │ │ • Health │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │ │
│ │ └─────────────────┼─────────────────┘ │ │
│ │ │ │ │
│ │ ┌────────▼────────┐ │ │
│ │ │ Scheduler │ │ │
│ │ │ │ │ │
│ │ │ • Ready queue │ │ │
│ │ │ • Assignments │ │ │
│ │ │ • Load balance │ │ │
│ │ └────────┬────────┘ │ │
│ │ │ │ │
│ └────────────────────────────┼───────────────────────────────────────┘ │
│ │ │
│ gRPC (Task Assignment, Results) │
│ │ │
│ ┌──────────────────────┼──────────────────────┐ │
│ │ │ │ │
│ ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐ │
│ │ WORKER-1 │ │ WORKER-2 │ │ WORKER-3 │ │
│ │ │ │ │ │ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │Thread │ │ │ │Thread │ │ │ │Thread │ │ │
│ │ │Pool │ │ │ │Pool │ │ │ │Pool │ │ │
│ │ │(4 thr)│ │ │ │(8 thr)│ │ │ │(4 thr)│ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ │ │ │ │ │ │ │
│ │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │ │
│ │ │Task │ │ │ │Task │ │ │ │Task │ │ │
│ │ │Queue │ │ │ │Queue │ │ │ │Queue │ │ │
│ │ └───────┘ │ │ └───────┘ │ │ └───────┘ │ │
│ │ │ │ │ │ │ │
│ │ Heartbeat │ │ Heartbeat │ │ Heartbeat │ │
│ │ (1s) ───────────────────────────────────► Coordinator │
│ │ │ │ │ │ │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Key Components
1. Coordinator Components
// Job and Task representation
struct Task {
std::string id;
std::string job_id;
TaskType type;
std::vector<std::string> dependencies;
TaskState state;
std::optional<std::string> assigned_worker;
std::chrono::steady_clock::time_point assigned_at;
int retry_count;
std::any result;
std::any input;
};
struct Job {
std::string id;
std::string name;
std::unordered_map<std::string, Task> tasks;
std::unordered_map<std::string, std::vector<std::string>> dependency_graph;
std::unordered_map<std::string, std::vector<std::string>> reverse_deps;
JobState state;
std::chrono::steady_clock::time_point submitted_at;
std::chrono::steady_clock::time_point completed_at;
};
// Worker tracking
struct WorkerInfo {
std::string id;
std::string address;
int total_slots;
int available_slots;
std::chrono::steady_clock::time_point last_heartbeat;
WorkerState state;
std::vector<std::string> assigned_tasks;
};
// Main coordinator class
class Coordinator {
public:
// gRPC service methods
grpc::Status SubmitJob(const JobRequest& request, JobResponse* response);
grpc::Status GetJobStatus(const StatusRequest& request, StatusResponse* response);
grpc::Status RegisterWorker(const WorkerInfo& info, RegistrationResponse* response);
grpc::Status Heartbeat(const HeartbeatRequest& request, HeartbeatResponse* response);
grpc::Status ReportTaskComplete(const TaskResult& result, Ack* response);
private:
// Core state
std::unordered_map<std::string, Job> jobs_;
std::unordered_map<std::string, WorkerInfo> workers_;
std::priority_queue<TaskRef> ready_queue_;
// Synchronization
mutable std::shared_mutex jobs_mutex_;
mutable std::shared_mutex workers_mutex_;
// Background threads
std::thread scheduler_thread_;
std::thread heartbeat_checker_thread_;
// Internal methods
void schedule_loop();
void check_heartbeats();
std::optional<std::string> select_worker(const Task& task);
void handle_worker_failure(const std::string& worker_id);
void update_task_state(const std::string& job_id, const std::string& task_id, TaskState new_state);
std::vector<std::string> get_ready_tasks(const Job& job);
};
2. Worker Components
// Worker implementation using previous projects
class Worker {
public:
Worker(const std::string& coordinator_addr, int num_threads, const std::string& name);
void run(); // Main worker loop
void shutdown();
private:
// From Project 3: Work-stealing thread pool
std::unique_ptr<WorkStealingPool> thread_pool_;
// From Project 6: Lock-free task queue
LockFreeQueue<TaskAssignment> incoming_tasks_;
// gRPC client to coordinator
std::unique_ptr<CoordinatorClient> coordinator_;
// Worker state
std::string id_;
std::string name_;
std::atomic<int> available_slots_;
std::atomic<bool> shutdown_{false};
// Background threads
std::thread heartbeat_thread_;
std::thread task_receiver_thread_;
// Task execution
void execute_task(const TaskAssignment& task);
void report_result(const std::string& task_id, const TaskResult& result);
void send_heartbeat();
};
3. Communication Protocol (gRPC)
// scheduler.proto
syntax = "proto3";
package dtask;
service Coordinator {
// Job management
rpc SubmitJob(JobSubmission) returns (JobResponse);
rpc GetJobStatus(JobStatusRequest) returns (JobStatusResponse);
rpc CancelJob(CancelRequest) returns (CancelResponse);
// Worker management
rpc RegisterWorker(WorkerRegistration) returns (RegistrationResponse);
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
// Task execution
rpc GetTask(TaskRequest) returns (TaskAssignment);
rpc ReportTaskComplete(TaskResult) returns (Ack);
rpc ReportTaskFailed(TaskFailure) returns (Ack);
}
message JobSubmission {
string name = 1;
repeated TaskDefinition tasks = 2;
}
message TaskDefinition {
string id = 1;
string type = 2;
repeated string dependencies = 3;
bytes input = 4; // Serialized task input
}
message TaskAssignment {
string job_id = 1;
string task_id = 2;
string type = 3;
bytes input = 4;
int64 timeout_ms = 5;
}
message TaskResult {
string job_id = 1;
string task_id = 2;
bool success = 3;
bytes output = 4; // Serialized result
string error_message = 5;
int64 execution_time_ms = 6;
}
message HeartbeatRequest {
string worker_id = 1;
int32 available_slots = 2;
repeated string running_tasks = 3;
}
message HeartbeatResponse {
bool acknowledged = 1;
repeated string tasks_to_cancel = 2; // For cancellation support
}
Data Structures
Task Dependency Graph (DAG)
class TaskDAG {
public:
// Add task with dependencies
void add_task(const std::string& task_id, const std::vector<std::string>& deps);
// Get tasks with no unmet dependencies
std::vector<std::string> get_ready_tasks() const;
// Mark task as complete and update dependents
std::vector<std::string> complete_task(const std::string& task_id);
// Topological sort for execution order
std::vector<std::string> topological_sort() const;
// Check for cycles (validation)
bool has_cycles() const;
private:
// Adjacency list: task -> tasks that depend on it
std::unordered_map<std::string, std::vector<std::string>> forward_edges_;
// Reverse: task -> tasks it depends on
std::unordered_map<std::string, std::vector<std::string>> backward_edges_;
// Count of unmet dependencies per task
std::unordered_map<std::string, int> unmet_dep_count_;
// Set of completed tasks
std::unordered_set<std::string> completed_;
};
Worker Load Balancer
class WorkerLoadBalancer {
public:
// Register worker with capacity
void add_worker(const std::string& id, int capacity);
// Remove worker (on failure)
void remove_worker(const std::string& id);
// Update available capacity
void update_capacity(const std::string& id, int available);
// Select best worker for task (returns nullopt if none available)
std::optional<std::string> select_worker(const Task& task);
// Strategies
enum class Strategy {
LeastLoaded, // Pick worker with most available slots
RoundRobin, // Rotate through workers
Random, // Random selection from available
LocalityAware // Prefer workers with cached data
};
void set_strategy(Strategy s);
private:
std::unordered_map<std::string, int> worker_capacity_;
std::unordered_map<std::string, int> worker_available_;
Strategy strategy_ = Strategy::LeastLoaded;
std::atomic<size_t> round_robin_index_{0};
};
Algorithm Overview
Scheduling Algorithm
SCHEDULE_TASKS():
while not shutdown:
# 1. Check for ready tasks across all jobs
ready_tasks = []
for job in active_jobs:
for task in job.get_ready_tasks():
if task.state == PENDING:
ready_tasks.append(task)
# 2. Sort by priority (if implemented)
ready_tasks.sort(by=priority, order=descending)
# 3. Assign to available workers
for task in ready_tasks:
worker = load_balancer.select_worker(task)
if worker:
assign_task(task, worker)
task.state = ASSIGNED
task.assigned_at = now()
else:
# No workers available, try again later
break
# 4. Check for timeouts
for task in running_tasks:
if now() - task.assigned_at > task.timeout:
handle_timeout(task)
sleep(10ms) # Scheduling interval
Failure Recovery Algorithm
HANDLE_WORKER_FAILURE(worker_id):
# 1. Mark worker as dead
worker.state = DEAD
# 2. Collect all tasks assigned to this worker
affected_tasks = []
for task in worker.assigned_tasks:
affected_tasks.append(task)
# 3. Reset task state for rescheduling
for task in affected_tasks:
if task.retry_count < MAX_RETRIES:
task.state = PENDING
task.assigned_worker = None
task.retry_count += 1
log("Rescheduling task {} (retry {})", task.id, task.retry_count)
else:
task.state = FAILED
log("Task {} exceeded max retries, marking as FAILED", task.id)
mark_job_failed(task.job_id)
# 4. Remove worker from registry
workers.remove(worker_id)
# 5. Trigger immediate scheduling
notify_scheduler()
Result Aggregation Algorithm
AGGREGATE_RESULTS(job_id):
job = jobs[job_id]
# Check if all tasks complete
for task in job.tasks:
if task.state != COMPLETED:
return # Not ready yet
# Collect results in dependency order
ordered_tasks = job.dag.topological_sort()
results = {}
for task_id in ordered_tasks:
task = job.tasks[task_id]
results[task_id] = task.result
# Find final tasks (no dependents)
final_tasks = job.dag.get_terminal_tasks()
# Merge final results
job.final_result = merge_results([results[t] for t in final_tasks])
job.state = COMPLETED
job.completed_at = now()
notify_client(job_id, job.final_result)
Implementation Guide
Development Environment Setup
Required tools:
# C++20 compiler
clang++ --version # 14+ recommended
g++ --version # 11+ recommended
# gRPC and Protobuf
brew install grpc protobuf # macOS
apt install libgrpc++-dev protobuf-compiler # Ubuntu
# CMake
cmake --version # 3.20+
# Optional but recommended
brew install abseil # For advanced utilities
CMakeLists.txt:
cmake_minimum_required(VERSION 3.20)
project(dtask)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
find_package(Protobuf REQUIRED)
find_package(gRPC CONFIG REQUIRED)
# Generate protobuf/gRPC code
add_custom_command(
OUTPUT scheduler.pb.cc scheduler.pb.h scheduler.grpc.pb.cc scheduler.grpc.pb.h
COMMAND protoc --cpp_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` scheduler.proto
DEPENDS scheduler.proto
)
# Coordinator
add_executable(dtask-coordinator
src/coordinator/main.cpp
src/coordinator/coordinator.cpp
src/coordinator/job_manager.cpp
src/coordinator/scheduler.cpp
src/coordinator/worker_registry.cpp
scheduler.pb.cc
scheduler.grpc.pb.cc
)
target_link_libraries(dtask-coordinator gRPC::grpc++ protobuf::libprotobuf pthread)
# Worker
add_executable(dtask-worker
src/worker/main.cpp
src/worker/worker.cpp
src/worker/task_executor.cpp
src/common/thread_pool.cpp
scheduler.pb.cc
scheduler.grpc.pb.cc
)
target_link_libraries(dtask-worker gRPC::grpc++ protobuf::libprotobuf pthread)
# CLI
add_executable(dtask
src/cli/main.cpp
src/cli/submit.cpp
src/cli/status.cpp
scheduler.pb.cc
scheduler.grpc.pb.cc
)
target_link_libraries(dtask gRPC::grpc++ protobuf::libprotobuf)
# Tests
enable_testing()
add_executable(dtask-tests
tests/dag_tests.cpp
tests/scheduler_tests.cpp
tests/integration_tests.cpp
)
target_link_libraries(dtask-tests gtest_main pthread)
Project Structure
dtask/
├── CMakeLists.txt
├── proto/
│ └── scheduler.proto # gRPC service definition
├── src/
│ ├── coordinator/
│ │ ├── main.cpp # Coordinator entry point
│ │ ├── coordinator.hpp # Main coordinator class
│ │ ├── coordinator.cpp
│ │ ├── job_manager.hpp # Job and task tracking
│ │ ├── job_manager.cpp
│ │ ├── scheduler.hpp # Task scheduling logic
│ │ ├── scheduler.cpp
│ │ ├── worker_registry.hpp # Worker tracking
│ │ └── worker_registry.cpp
│ ├── worker/
│ │ ├── main.cpp # Worker entry point
│ │ ├── worker.hpp # Main worker class
│ │ ├── worker.cpp
│ │ ├── task_executor.hpp # Task execution
│ │ └── task_executor.cpp
│ ├── cli/
│ │ ├── main.cpp # CLI entry point
│ │ ├── submit.cpp # Job submission
│ │ └── status.cpp # Status queries
│ └── common/
│ ├── thread_pool.hpp # From Project 3
│ ├── thread_pool.cpp
│ ├── lock_free_queue.hpp # From Project 6
│ ├── dag.hpp # Task dependency graph
│ ├── dag.cpp
│ ├── types.hpp # Shared types
│ └── logging.hpp # Logging utilities
├── tests/
│ ├── dag_tests.cpp
│ ├── scheduler_tests.cpp
│ ├── failure_tests.cpp
│ └── integration_tests.cpp
├── examples/
│ ├── wordcount.yaml # Example job definition
│ └── pipeline.yaml
└── README.md
The Core Question You’re Answering
“How do you coordinate task execution across multiple unreliable machines while respecting dependencies, handling failures, and maximizing throughput?”
This is the fundamental question of distributed computing. Your implementation must answer:
- How do you track what work needs to be done? (Job manager, DAG)
- How do you decide where to run it? (Scheduler, load balancer)
- How do you know if a worker is alive? (Heartbeats)
- What happens when things fail? (Retry logic, state machine)
- How do you get results back? (Aggregation, client notification)
Concepts You Must Understand First
Before starting implementation, verify you understand these concepts:
| Concept | Self-Check Question | Book Reference |
|---|---|---|
| gRPC | Can you explain the difference between unary and streaming RPCs? | gRPC documentation |
| Protobuf | Can you define a message with nested types and enums? | Protocol Buffers docs |
| DAG | Can you implement topological sort? | CLRS Chapter 22 |
| State machines | Can you draw the state diagram for a task? | Design Patterns |
| Thread pools | Can you explain how work-stealing balances load? | Project 3 |
| Heartbeats | Can you explain how to detect a failed node? | Kleppmann Chapter 8 |
| Retries | Can you explain idempotency and why it matters? | Kleppmann Chapter 8 |
Questions to Guide Your Design
Architecture Questions:
- Should task state live in the coordinator only, or be replicated?
- How should workers discover the coordinator? (Config file? DNS? Service discovery?)
- What happens if the coordinator crashes? (State persistence? Leader election?)
Scheduling Questions:
- Push model (coordinator sends tasks) or pull model (workers request tasks)?
- How do you handle “sticky” tasks that should run on specific workers?
- How do you prevent task starvation for low-priority jobs?
Failure Questions:
- How long should heartbeat timeout be? (Trade-off: fast detection vs false positives)
- What if a task has side effects that can’t be retried?
- How do you handle partial job failure?
Performance Questions:
- How do you batch small tasks to reduce RPC overhead?
- How do you minimize lock contention in the coordinator?
- How do you scale to 100+ workers?
Thinking Exercise
Before coding, trace through this scenario manually:
Job submission:
Task A: no dependencies
Task B: depends on A
Task C: depends on A
Task D: depends on B, C
Workers: W1 (2 slots), W2 (2 slots)
Trace the execution:
1. What tasks can run immediately?
2. Draw the coordinator's state after A completes on W1
3. What if W2 crashes while running C?
4. When does D become ready?
5. What is the total execution time if each task takes 1 second?
Draw the timeline:
Time: 0 1 2 3 4
W1: [A] [B] [D]
W2: [C] [C] ...
^ ^ ^
| | |
| | D can start (B,C done)
| B,C can start (A done)
A starts (no deps)
Hints in Layers
Hint 1: Starting Point (Conceptual Direction)
Start with the simplest possible system that works:
- Single-node coordinator (no replication)
- Simple FIFO scheduling (no priorities)
- Tasks have no dependencies (parallel batch)
- No failure recovery yet
Get this working end-to-end, then add complexity.
Hint 2: Next Level (More Specific Guidance)
Implement in this order:
- gRPC skeleton: Define service, generate code, verify connectivity
- Worker registration: Workers can connect and report capacity
- Task submission: Coordinator accepts jobs with tasks
- Basic scheduling: Assign tasks to workers round-robin
- Result reporting: Workers report completion
- DAG support: Add dependency tracking
- Heartbeats: Detect worker failures
- Retry logic: Reschedule failed tasks
Hint 3: Technical Details (Approach/Pseudocode)
Coordinator main loop:
void Coordinator::run() {
// Start gRPC server in background
grpc_server_thread_ = std::thread([this] {
grpc::ServerBuilder builder;
builder.AddListeningPort(address_, grpc::InsecureServerCredentials());
builder.RegisterService(this);
auto server = builder.BuildAndStart();
server->Wait();
});
// Start scheduler thread
scheduler_thread_ = std::thread([this] {
while (!shutdown_) {
schedule_ready_tasks();
std::this_thread::sleep_for(10ms);
}
});
// Start heartbeat checker
heartbeat_thread_ = std::thread([this] {
while (!shutdown_) {
check_worker_health();
std::this_thread::sleep_for(1s);
}
});
// Wait for shutdown
shutdown_cv_.wait(...);
}
Worker main loop:
void Worker::run() {
// Register with coordinator
register_with_coordinator();
// Start heartbeat thread
heartbeat_thread_ = std::thread([this] {
while (!shutdown_) {
send_heartbeat();
std::this_thread::sleep_for(1s);
}
});
// Main task execution loop
while (!shutdown_) {
// Request task from coordinator
auto task = request_task();
if (task) {
// Execute using thread pool
auto future = thread_pool_->submit([this, task] {
return execute_task(*task);
});
// Report result when done
auto result = future.get();
report_result(task->task_id, result);
} else {
// No tasks available, back off
std::this_thread::sleep_for(100ms);
}
}
}
Hint 4: Tools and Debugging
Debugging distributed systems is HARD. Use these tools:
- Structured logging with request IDs:
#define LOG(level, ...) log_impl(level, job_id, task_id, __VA_ARGS__) // Enables tracing a single task through the system - State visualization:
void Coordinator::dump_state() { for (const auto& [job_id, job] : jobs_) { std::cout << "Job " << job_id << ":\n"; for (const auto& [task_id, task] : job.tasks) { std::cout << " " << task_id << ": " << to_string(task.state); if (task.assigned_worker) { std::cout << " on " << *task.assigned_worker; } std::cout << "\n"; } } } - Simulate failures in tests:
class FaultInjector { public: void set_worker_crash_probability(double p); void set_network_delay_ms(int min, int max); void crash_worker(const std::string& id); void partition_network(const std::string& worker_id); }; - ThreadSanitizer for race detection:
clang++ -fsanitize=thread -g -O1 ...
The Interview Questions They’ll Ask
After completing this project, be ready to answer:
- “How would you handle coordinator failure?”
- Answer: State persistence to disk/database, or leader election with Raft for HA
- “What happens if two workers both think they’re executing the same task?”
- Answer: Idempotency tokens, at-most-once delivery, or conflict resolution
- “How would you scale to 10,000 workers?”
- Answer: Hierarchical scheduling, partitioned coordinators, pull-based task assignment
- “What if tasks take hours and heartbeats fail occasionally?”
- Answer: Adaptive heartbeat intervals, task-level heartbeats, lease-based ownership
- “How would you add exactly-once semantics?”
- Answer: Two-phase commit, saga pattern, or idempotent task design
- “Walk me through what happens when you submit a job with 100 dependent tasks.”
- Answer: (Trace through your implementation step by step)
- “How do you prevent a slow task from blocking the entire job?”
- Answer: Speculative execution, task splitting, timeout with retry on different worker
- “What’s the difference between this and Kubernetes?”
- Answer: K8s schedules long-running containers; this schedules short-lived tasks with dependencies
Books That Will Help
| Topic | Book | Chapters |
|---|---|---|
| Distributed Systems Fundamentals | “Designing Data-Intensive Applications” by Martin Kleppmann | Part II (entire) |
| Consensus and Replication | “Designing Data-Intensive Applications” by Martin Kleppmann | Chapters 5, 9 |
| Failure Detection | “Distributed Systems” by Tanenbaum | Chapter 8 |
| gRPC and Protobuf | gRPC documentation | Core concepts, C++ tutorial |
| MapReduce Pattern | Original MapReduce paper (Dean & Ghemawat) | Entire paper |
| Thread Pool Implementation | “C++ Concurrency in Action” by Anthony Williams | Chapter 9 |
Implementation Phases
Phase 1: Foundation (Week 1-2)
Goal: Basic coordinator-worker communication
Tasks:
- Define protobuf messages and gRPC service
- Implement coordinator gRPC server skeleton
- Implement worker registration
- Test: Worker can connect and register
Success criteria: ./dtask worker connects to ./dtask coordinator and appears in worker list
Phase 2: Basic Scheduling (Week 2-3)
Goal: Execute independent tasks
Tasks:
- Implement job submission (no dependencies yet)
- Implement simple round-robin scheduling
- Implement task execution in worker thread pool
- Implement result reporting
Success criteria: Submit 100 independent tasks, all complete successfully
Phase 3: Dependency Graph (Week 3-4)
Goal: Execute tasks with dependencies
Tasks:
- Implement DAG data structure
- Add dependency tracking to job manager
- Compute ready tasks based on completed dependencies
- Test topological execution order
Success criteria: Diamond dependency pattern (A -> B, A -> C, B+C -> D) executes correctly
Phase 4: Heartbeats and Failure Detection (Week 4-5)
Goal: Detect and handle worker failures
Tasks:
- Implement heartbeat protocol
- Implement heartbeat timeout detection
- Implement task reassignment on worker failure
- Test failure scenarios
Success criteria: Kill a worker mid-execution, tasks get rescheduled and complete
Phase 5: Retry Logic and Timeouts (Week 5-6)
Goal: Handle task-level failures
Tasks:
- Implement task timeout detection
- Implement retry logic with backoff
- Implement max retry limits
- Handle permanently failed tasks
Success criteria: Simulate task failures, retries happen, eventually succeeds or fails cleanly
Phase 6: Result Aggregation (Week 6-7)
Goal: Collect and return job results
Tasks:
- Implement result storage in coordinator
- Implement result aggregation for completed jobs
- Implement client notification
- Test full job lifecycle
Success criteria: Submit MapReduce-style job, get aggregated result
Phase 7: Polish and Performance (Week 7-10)
Goal: Production-quality system
Tasks:
- Add comprehensive logging
- Add metrics (task throughput, latency percentiles)
- Optimize hot paths
- Stress testing with many workers and tasks
- Documentation
Success criteria: 10+ workers, 10,000+ tasks, <100ms P99 scheduling latency
Key Implementation Decisions
Decision 1: Push vs Pull Model
PUSH Model:
Coordinator actively sends tasks to workers
+ Coordinator has full control
+ Lower latency for new tasks
- Requires tracking worker capacity accurately
- More complex failure handling
PULL Model:
Workers request tasks when idle
+ Simpler coordinator logic
+ Natural load balancing
+ Workers control their own pace
- Potentially higher latency (poll interval)
- Empty polls waste resources
Recommendation: Start with PULL for simplicity
Decision 2: Synchronous vs Async gRPC
Synchronous:
- Simpler to implement
- Each RPC blocks a thread
- Limited scalability
Asynchronous:
- More complex (completion queues)
- Single thread handles many connections
- Better scalability
Recommendation: Start synchronous, optimize later if needed
Decision 3: State Persistence
In-Memory Only:
- Simple, fast
- State lost on coordinator crash
- Good for development/testing
Persistent (SQLite/RocksDB):
- State survives restarts
- More complex
- Required for production
Recommendation: Start in-memory, add persistence in Phase 7
Testing Strategy
Unit Tests
// DAG tests
TEST(TaskDAG, ReadyTasksWithNoDeps) {
TaskDAG dag;
dag.add_task("A", {});
dag.add_task("B", {});
dag.add_task("C", {});
auto ready = dag.get_ready_tasks();
EXPECT_EQ(ready.size(), 3);
}
TEST(TaskDAG, ReadyTasksWithDeps) {
TaskDAG dag;
dag.add_task("A", {});
dag.add_task("B", {"A"});
dag.add_task("C", {"A"});
dag.add_task("D", {"B", "C"});
// Initially only A is ready
auto ready = dag.get_ready_tasks();
EXPECT_EQ(ready.size(), 1);
EXPECT_EQ(ready[0], "A");
// After A completes, B and C become ready
dag.complete_task("A");
ready = dag.get_ready_tasks();
EXPECT_EQ(ready.size(), 2);
// After B and C complete, D becomes ready
dag.complete_task("B");
dag.complete_task("C");
ready = dag.get_ready_tasks();
EXPECT_EQ(ready.size(), 1);
EXPECT_EQ(ready[0], "D");
}
TEST(TaskDAG, CycleDetection) {
TaskDAG dag;
dag.add_task("A", {"B"});
dag.add_task("B", {"A"});
EXPECT_TRUE(dag.has_cycles());
}
// Scheduler tests
TEST(Scheduler, LeastLoadedWorkerSelection) {
WorkerLoadBalancer lb;
lb.add_worker("w1", 4);
lb.add_worker("w2", 8);
lb.set_strategy(Strategy::LeastLoaded);
// w2 has more capacity
lb.update_capacity("w1", 2);
lb.update_capacity("w2", 6);
auto selected = lb.select_worker(Task{});
EXPECT_EQ(selected, "w2");
}
// State machine tests
TEST(TaskStateMachine, ValidTransitions) {
Task task;
task.state = TaskState::PENDING;
EXPECT_TRUE(task.can_transition_to(TaskState::ASSIGNED));
EXPECT_FALSE(task.can_transition_to(TaskState::COMPLETED));
}
Integration Tests
TEST(Integration, BasicJobExecution) {
// Start coordinator in background
auto coordinator = std::make_unique<Coordinator>("localhost:8080");
std::thread coord_thread([&] { coordinator->run(); });
// Start worker
auto worker = std::make_unique<Worker>("localhost:8080", 4, "test-worker");
std::thread worker_thread([&] { worker->run(); });
// Wait for registration
std::this_thread::sleep_for(100ms);
// Submit job
JobSubmission job;
job.set_name("test-job");
auto* task = job.add_tasks();
task->set_id("task-1");
task->set_type("echo");
auto client = CreateClient("localhost:8080");
auto response = client->SubmitJob(job);
EXPECT_TRUE(response.ok());
// Wait for completion
std::this_thread::sleep_for(1s);
// Check status
auto status = client->GetJobStatus(response.job_id());
EXPECT_EQ(status.state(), JobState::COMPLETED);
// Cleanup
coordinator->shutdown();
worker->shutdown();
coord_thread.join();
worker_thread.join();
}
TEST(Integration, WorkerFailureRecovery) {
auto coordinator = start_coordinator();
// Start two workers
auto worker1 = start_worker("worker-1");
auto worker2 = start_worker("worker-2");
// Submit job with long-running tasks
auto job = submit_long_running_job(10); // 10 tasks, each takes 5s
// Wait for tasks to start
std::this_thread::sleep_for(500ms);
// Kill worker1
worker1->crash();
// Wait for job completion
wait_for_job_completion(job.id(), 60s);
// Verify all tasks completed (some retried on worker2)
auto status = get_job_status(job.id());
EXPECT_EQ(status.state(), JobState::COMPLETED);
EXPECT_GE(status.retry_count(), 1); // At least one retry
cleanup();
}
TEST(Integration, DAGExecution) {
auto coordinator = start_coordinator();
auto worker = start_worker("worker-1");
// Submit diamond DAG: A -> B, A -> C, B+C -> D
JobSubmission job;
add_task(job, "A", {});
add_task(job, "B", {"A"});
add_task(job, "C", {"A"});
add_task(job, "D", {"B", "C"});
auto response = submit_job(job);
// Track execution order
std::vector<std::string> execution_order;
wait_for_job_completion(response.job_id(), 10s);
// Verify order respects dependencies
auto status = get_job_status(response.job_id());
EXPECT_TRUE(completed_before(status, "A", "B"));
EXPECT_TRUE(completed_before(status, "A", "C"));
EXPECT_TRUE(completed_before(status, "B", "D"));
EXPECT_TRUE(completed_before(status, "C", "D"));
}
Stress Tests
TEST(Stress, ManyTasks) {
auto coordinator = start_coordinator();
// Start 10 workers with 8 threads each
std::vector<std::unique_ptr<Worker>> workers;
for (int i = 0; i < 10; ++i) {
workers.push_back(start_worker("worker-" + std::to_string(i)));
}
// Submit 10,000 independent tasks
JobSubmission job;
for (int i = 0; i < 10000; ++i) {
add_task(job, "task-" + std::to_string(i), {});
}
auto start = std::chrono::steady_clock::now();
auto response = submit_job(job);
wait_for_job_completion(response.job_id(), 60s);
auto end = std::chrono::steady_clock::now();
auto status = get_job_status(response.job_id());
EXPECT_EQ(status.state(), JobState::COMPLETED);
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "10,000 tasks completed in " << duration.count() << "ms\n";
std::cout << "Throughput: " << 10000.0 / (duration.count() / 1000.0) << " tasks/sec\n";
// Should complete in reasonable time with 80 threads
EXPECT_LT(duration.count(), 30000); // 30 seconds max
}
TEST(Stress, ChaosMonkey) {
// Run for 60 seconds with random failures
auto coordinator = start_coordinator();
// Start/stop workers randomly
std::atomic<bool> stop{false};
std::thread chaos_thread([&] {
while (!stop) {
auto action = random_action();
switch (action) {
case Action::StartWorker:
start_random_worker();
break;
case Action::KillWorker:
kill_random_worker();
break;
case Action::NetworkPartition:
partition_random_worker();
break;
}
std::this_thread::sleep_for(random_interval(100ms, 1s));
}
});
// Submit jobs continuously
std::atomic<int> completed_jobs{0};
std::atomic<int> failed_jobs{0};
for (int i = 0; i < 100; ++i) {
auto response = submit_random_job();
auto status = wait_for_job(response.job_id(), 30s);
if (status.state() == JobState::COMPLETED) {
completed_jobs++;
} else {
failed_jobs++;
}
}
stop = true;
chaos_thread.join();
std::cout << "Completed: " << completed_jobs << ", Failed: " << failed_jobs << "\n";
// Most jobs should complete despite chaos
EXPECT_GT(completed_jobs.load(), 80);
}
Common Pitfalls & Debugging
Pitfall 1: Race Condition in Task Assignment
Symptom: Same task assigned to multiple workers, or task state corrupted.
Cause: Multiple scheduler threads or RPCs modifying task state concurrently.
Solution:
// Use mutex for task state modifications
std::unique_lock lock(task_mutex_);
if (task.state == TaskState::PENDING) {
task.state = TaskState::ASSIGNED;
task.assigned_worker = worker_id;
task.assigned_at = now();
}
// Release lock before network call
lock.unlock();
send_assignment_to_worker(...);
Pitfall 2: Heartbeat False Positives
Symptom: Workers marked as dead even though they’re running fine.
Cause: Network delay or coordinator overload causing missed heartbeat processing.
Solution:
// Use multiple missed heartbeats before declaring dead
const int MISS_THRESHOLD = 3;
void check_worker_health() {
for (auto& [id, worker] : workers_) {
auto elapsed = now() - worker.last_heartbeat;
if (elapsed > heartbeat_interval_ * MISS_THRESHOLD) {
// Missed 3 heartbeats, consider dead
handle_worker_failure(id);
}
}
}
Pitfall 3: Task Stuck in RUNNING Forever
Symptom: Task shows as RUNNING but worker is dead.
Cause: Worker crashed after acknowledging task but before completing.
Solution:
// Check both heartbeat AND task timeout
void check_running_tasks() {
for (auto& [job_id, job] : jobs_) {
for (auto& [task_id, task] : job.tasks) {
if (task.state == TaskState::RUNNING) {
auto elapsed = now() - task.assigned_at;
if (elapsed > task_timeout_) {
// Task exceeded timeout
LOG("Task {} timed out", task_id);
reschedule_task(task);
}
}
}
}
}
Pitfall 4: Deadlock in Dependency Resolution
Symptom: Job never completes even though no tasks are running.
Cause: Circular dependency or dependency on non-existent task.
Solution:
// Validate DAG on job submission
bool validate_job(const Job& job) {
// Check for cycles
if (job.dag.has_cycles()) {
return false;
}
// Check all dependencies exist
for (const auto& [task_id, task] : job.tasks) {
for (const auto& dep : task.dependencies) {
if (job.tasks.find(dep) == job.tasks.end()) {
LOG("Task {} depends on non-existent task {}", task_id, dep);
return false;
}
}
}
return true;
}
Pitfall 5: Memory Leak from Unfinished Jobs
Symptom: Coordinator memory grows over time.
Cause: Completed/failed jobs not being cleaned up.
Solution:
// Clean up old jobs periodically
void cleanup_old_jobs() {
auto cutoff = now() - retention_period_;
std::unique_lock lock(jobs_mutex_);
for (auto it = jobs_.begin(); it != jobs_.end(); ) {
if (it->second.completed_at < cutoff &&
(it->second.state == JobState::COMPLETED ||
it->second.state == JobState::FAILED)) {
it = jobs_.erase(it);
} else {
++it;
}
}
}
Pitfall 6: Thundering Herd on Coordinator Restart
Symptom: Coordinator overwhelmed when restarting with many workers.
Cause: All workers try to re-register simultaneously.
Solution:
// Workers: Jittered reconnection
void Worker::reconnect() {
auto jitter = random_interval(0ms, 5000ms);
std::this_thread::sleep_for(jitter);
connect_to_coordinator();
}
// Coordinator: Rate limit registrations
class RegistrationLimiter {
std::atomic<int> pending_{0};
static constexpr int MAX_CONCURRENT = 10;
public:
bool try_acquire() {
int expected = pending_.load();
while (expected < MAX_CONCURRENT) {
if (pending_.compare_exchange_weak(expected, expected + 1)) {
return true;
}
}
return false; // Reject, try again later
}
void release() { pending_.fetch_sub(1); }
};
Extensions & Challenges
Extension 1: Priority Scheduling
Add task priorities with preemption:
enum class Priority { Low, Normal, High, Critical };
// Priority queue for ready tasks
struct PriorityComparator {
bool operator()(const Task& a, const Task& b) {
return a.priority < b.priority; // Higher priority first
}
};
std::priority_queue<Task, std::vector<Task>, PriorityComparator> ready_queue_;
Extension 2: Work Stealing Across Workers
When one worker is idle while another is overloaded:
// Worker requests task from coordinator
// If no tasks from coordinator, coordinator tells worker to steal from peer
message StealRequest {
string worker_id = 1;
}
message StealResponse {
oneof result {
TaskAssignment task = 1;
string steal_from_worker = 2; // Address of peer to steal from
bool no_work_available = 3;
}
}
Extension 3: Coordinator High Availability
Implement leader election with Raft:
class RaftCoordinator : public Coordinator {
// Raft state
enum class State { Follower, Candidate, Leader };
State state_ = State::Follower;
int current_term_ = 0;
std::optional<std::string> voted_for_;
std::vector<LogEntry> log_;
// Replicate all state changes through Raft
void replicate_state_change(const StateChange& change);
// Election
void start_election();
void become_leader();
void become_follower(int term);
};
Extension 4: Speculative Execution
Run slow tasks on multiple workers:
// If task is taking too long, speculatively run on another worker
void check_for_speculation() {
for (const auto& task : running_tasks_) {
auto elapsed = now() - task.started_at;
auto expected = estimate_task_duration(task);
if (elapsed > expected * SPECULATION_THRESHOLD) {
// Task is slow, start speculative copy
assign_speculative_task(task);
}
}
}
// When any copy completes, cancel the others
void on_task_complete(const std::string& task_id) {
cancel_speculative_copies(task_id);
// ... rest of completion logic
}
Extension 5: Resource Constraints
Schedule based on memory/CPU requirements:
struct TaskRequirements {
int cpu_cores = 1;
size_t memory_mb = 512;
size_t disk_mb = 0;
std::vector<std::string> labels; // e.g., "gpu", "ssd"
};
struct WorkerResources {
int total_cores;
int available_cores;
size_t total_memory_mb;
size_t available_memory_mb;
std::set<std::string> labels;
};
bool can_schedule(const Task& task, const Worker& worker) {
return task.requirements.cpu_cores <= worker.available_cores &&
task.requirements.memory_mb <= worker.available_memory_mb &&
has_all_labels(task.requirements.labels, worker.labels);
}
Challenge: Exactly-Once Execution
Guarantee each task runs exactly once, even with retries:
// Use idempotency keys
struct Task {
std::string id;
std::string idempotency_key; // Unique per execution attempt
// ...
};
// Workers check if result already exists
void execute_task(const Task& task) {
// Check result cache
if (auto result = result_cache_.get(task.idempotency_key)) {
// Already executed, return cached result
return *result;
}
// Execute
auto result = run_task(task);
// Cache result atomically
result_cache_.put(task.idempotency_key, result);
return result;
}
Real-World Connections
How Production Systems Solve This
| System | Architecture | Key Innovation |
|---|---|---|
| Kubernetes | Controller pattern with etcd | Declarative state reconciliation |
| Apache YARN | ResourceManager + NodeManager | Container-based isolation |
| Mesos | Two-level scheduling | Resource offers |
| Temporal | Event sourcing | Durable execution, replay |
| Airflow | DAG-based scheduling | Declarative workflows |
| Spark | Driver + Executors | Resilient distributed datasets |
Industry Practices
- Observability: Distributed tracing (Jaeger), metrics (Prometheus), logging (ELK)
- Configuration: Service discovery (Consul), config management (etcd)
- Communication: gRPC for internal, REST for external
- Testing: Chaos engineering (Chaos Monkey), contract testing
- Deployment: Blue-green, canary releases
Where This Project Leads
- Platform Engineering: Build internal developer platforms
- Distributed Systems Engineer: Design scalable systems
- SRE/Infrastructure: Operate and improve distributed systems
- Technical interviews: Strong distributed systems fundamentals
Resources
Essential Reading
- “Designing Data-Intensive Applications” by Martin Kleppmann
- Chapters 5, 8, 9: Replication, fault tolerance, consistency
- “MapReduce: Simplified Data Processing on Large Clusters” (Dean & Ghemawat)
- Original paper on distributed batch processing
- “In Search of an Understandable Consensus Algorithm” (Raft paper)
- Diego Ongaro, John Ousterhout
- gRPC Documentation
- C++ quickstart, concepts, best practices
Code to Study
| Project | Language | Notes |
|---|---|---|
| Ray | Python/C++ | Distributed computing framework |
| Temporal | Go | Workflow orchestration |
| Dask | Python | Distributed computing scheduler |
| etcd | Go | Distributed key-value store |
| grpc | C++ | gRPC reference implementation |
Tools
- gRPC: Service communication
- Protobuf: Serialization
- Prometheus: Metrics collection
- Grafana: Metrics visualization
- Jaeger: Distributed tracing
- ThreadSanitizer: Race detection
Self-Assessment Checklist
Understanding
- I can explain the difference between coordinator and worker responsibilities
- I can describe the task state machine and all transitions
- I can explain how DAG dependencies affect execution order
- I can describe what happens when a worker fails mid-task
- I understand the trade-offs between push and pull scheduling models
- I can explain why heartbeats are necessary and their limitations
- I understand idempotency and why it matters for retries
Implementation
- Coordinator accepts gRPC connections from workers
- Workers register and report capacity
- Tasks are assigned to workers based on availability
- Task results are reported back to coordinator
- DAG dependencies are respected in execution order
- Heartbeat timeout correctly detects worker failures
- Failed tasks are rescheduled on different workers
- Jobs complete successfully when all tasks finish
Performance
- System handles 10+ workers simultaneously
- Task scheduling latency is < 100ms P99
- System recovers from worker failure in < 5 seconds
- No memory leaks during extended operation
- ThreadSanitizer reports no races
Testing
- Unit tests for DAG, scheduler, state machine
- Integration tests for happy path
- Failure tests (worker crash, timeout)
- Stress tests with many tasks
- Chaos testing with random failures
Submission / Completion Criteria
Minimum Viable Completion
- Coordinator and worker processes start and communicate
- Workers register with coordinator
- Tasks are submitted and executed on workers
- Results are reported back to coordinator
- Simple jobs complete successfully
Full Completion
- Task dependencies (DAG) work correctly
- Heartbeat-based failure detection
- Task reassignment on worker failure
- Retry logic with configurable limits
- Task timeouts
- Result aggregation
- CLI for submission and status
- 10+ workers, 1000+ tasks stress test passes
Excellence (Going Above & Beyond)
- Priority scheduling
- Work stealing across workers
- Speculative execution
- Metrics and monitoring
- Coordinator persistence/recovery
- Comprehensive chaos testing
- Documentation and diagrams
- Performance optimization (>10K tasks/sec throughput)
Building a distributed task scheduler teaches you the core challenges of distributed computing: coordination, failure handling, and consistency. After this project, you’ll understand how systems like Kubernetes, Airflow, and Spark work under the hood, and you’ll be prepared for infrastructure/platform engineering roles at any scale.
This guide was expanded from LEARN_CPP_CONCURRENCY_AND_PARALLELISM.md. For the complete learning path, see the project index.