LEARN JOB QUEUES DEEP DIVE
Learn Job Processing & Message Queues: From Zero to Distributed Systems Master
Goal: Deeply understand how job processing systems and message queues work—from basic FIFO queues to distributed workers, from RabbitMQ’s AMQP protocol to Kafka’s event streaming, and everything in between.
Why Job Queues Matter
Every production system eventually needs to do something that can’t happen synchronously: send emails, process images, generate reports, sync data with external services. Job queues are the answer, and message brokers are the infrastructure that makes them possible.
After completing these projects, you will:
- Understand the fundamental difference between message queues and event streaming
- Know when to use RabbitMQ vs Kafka vs Redis-based queues vs SQS
- Build reliable, fault-tolerant distributed workers from scratch
- Implement retry patterns with exponential backoff and dead letter queues
- Understand delivery semantics: at-most-once, at-least-once, exactly-once
- Design systems that handle millions of jobs per day
Core Concept Analysis
What is a Job Queue?
A job queue is a data structure that holds tasks to be processed asynchronously. The basic model:
┌──────────┐ ┌───────────┐ ┌──────────┐
│ Producer │ ──── │ Queue │ ──── │ Worker │
│ (App) │ push │ (Broker) │ pull │(Consumer)│
└──────────┘ └───────────┘ └──────────┘
│
▼
┌──────────┐
│ Worker │
│(Consumer)│
└──────────┘
Message Queue vs Event Streaming
These are fundamentally different paradigms:
| Aspect | Message Queue (RabbitMQ) | Event Streaming (Kafka) |
|---|---|---|
| Model | Point-to-point | Publish-subscribe log |
| Message Lifecycle | Deleted after consumption | Retained for configurable time |
| Consumer Model | Push-based | Pull-based |
| Ordering | Per-queue ordering | Per-partition ordering |
| Replayability | No (message gone after ack) | Yes (consumers track offsets) |
| Primary Use | Task distribution | Event sourcing, data pipelines |
The AMQP Model (RabbitMQ)
Producer → Exchange → Binding → Queue → Consumer
│
├─ Direct Exchange: Route by exact key match
├─ Topic Exchange: Route by pattern match
├─ Fanout Exchange: Broadcast to all queues
└─ Headers Exchange: Route by message headers
The Kafka Model
┌─────────────────────────────────────────────────────────────┐
│ Topic │
├─────────────────┬─────────────────┬─────────────────────────┤
│ Partition 0 │ Partition 1 │ Partition 2 │
├─────────────────┼─────────────────┼─────────────────────────┤
│ [0][1][2][3][4] │ [0][1][2][3] │ [0][1][2][3][4][5] │
│ ↑ │ ↑ │ ↑ │
│ offset │ offset │ offset │
└─────────────────┴─────────────────┴─────────────────────────┘
│ │ │
▼ ▼ ▼
Consumer A Consumer B Consumer C
(Group: X) (Group: X) (Group: X)
Delivery Semantics
- At-Most-Once: Fire and forget. Message may be lost. Fastest but unreliable.
Producer → Broker (no ack) → Consumer (no ack) - At-Least-Once: Guaranteed delivery, but may duplicate. Consumers must be idempotent.
Producer → Broker (ack) → Consumer (ack after processing) If consumer crashes before ack, message is redelivered. - Exactly-Once: Each message processed exactly once. Requires transactions or idempotency.
Producer (idempotent) → Broker (transactional) → Consumer (transactional)
Worker Patterns
Competing Consumers: Multiple workers pull from the same queue
┌── Worker 1 ──┐
Queue ─────────────┼── Worker 2 ──┼─────────▶ Results
└── Worker 3 ──┘
Fan-out: One message goes to many consumers
┌── Service A
Exchange ──────────┼── Service B
└── Service C
Request-Reply: RPC over message queue
Client → Request Queue → Worker → Reply Queue → Client
System Comparison
| System | Type | Best For | Language | Complexity |
|---|---|---|---|---|
| RabbitMQ | Message Broker | Complex routing, RPC, mixed patterns | Erlang | Medium |
| Kafka | Event Streaming | High throughput, event sourcing, replay | Java/Scala | High |
| Redis + BullMQ | Job Queue | Simple background jobs, Node.js apps | C | Low |
| Amazon SQS | Managed Queue | AWS-native, serverless, simple queuing | Managed | Low |
| NATS JetStream | Lightweight Streaming | Edge computing, low latency, simple ops | Go | Low |
| Celery | Task Queue Framework | Python apps, scheduled tasks | Python | Medium |
| Sidekiq | Job Queue | Ruby/Rails apps, Redis-backed | Ruby | Low |
When to Choose Each
| Scenario | Recommended System |
|---|---|
| Background jobs in a web app | Redis + BullMQ/Sidekiq |
| Microservices communication | RabbitMQ |
| Real-time analytics pipeline | Kafka |
| Event sourcing architecture | Kafka |
| Simple task distribution (AWS) | SQS |
| IoT/Edge with low resources | NATS JetStream |
| Python/Django background tasks | Celery + Redis/RabbitMQ |
Project List
Projects are ordered from fundamental understanding to production-ready implementations.
Project 1: Build a Simple In-Memory Job Queue
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: JavaScript (Node.js)
- Alternative Programming Languages: Python, Go, Rust
- Coolness Level: Level 2: Practical but Forgettable
- Business Potential: 1. The “Resume Gold”
- Difficulty: Level 1: Beginner
- Knowledge Area: Data Structures / Concurrency
- Software or Tool: None (pure implementation)
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A basic in-memory job queue with producer/consumer pattern, job priorities, delayed jobs, and concurrent workers—no external dependencies.
Why it teaches job queues: Before using Redis or RabbitMQ, you need to understand what a queue fundamentally is. This project strips away all abstraction to show the core mechanics: enqueue, dequeue, worker loops, and job lifecycle.
Core challenges you’ll face:
- Implementing thread-safe queue operations → maps to why Redis uses single-threaded model
- Managing job priorities → maps to priority queues in production systems
- Handling delayed jobs → maps to scheduled jobs in BullMQ/Sidekiq
- Coordinating multiple workers → maps to competing consumers pattern
Key Concepts:
- Queue Data Structures: “Algorithms, Fourth Edition” Chapter 1.3 - Sedgewick & Wayne
- Producer-Consumer Pattern: “Java Concurrency in Practice” Chapter 5 - Brian Goetz
- Event Loop: Node.js Event Loop Guide
- Priority Queues: “Introduction to Algorithms” Chapter 6 - CLRS
Difficulty: Beginner Time estimate: Weekend Prerequisites: Basic programming, understanding of arrays/objects
Real world outcome:
const queue = createJobQueue({ concurrency: 3 });
// Register job processor
queue.process('email', async (job) => {
console.log(`Sending email to ${job.data.to}`);
await sendEmail(job.data);
return { sent: true };
});
// Add jobs
queue.add('email', { to: 'user@example.com', subject: 'Welcome!' });
queue.add('email', { to: 'other@example.com', subject: 'Hello!' }, { priority: 1 });
queue.add('email', { to: 'delayed@example.com', subject: 'Later!' }, { delay: 5000 });
// Start processing
queue.start();
$ node queue.js
[Worker 1] Processing job email-001: Sending email to other@example.com
[Worker 2] Processing job email-002: Sending email to user@example.com
[Worker 1] Completed job email-001 in 150ms
[Worker 1] Processing job email-003: Sending email to delayed@example.com (was delayed 5000ms)
[Worker 2] Completed job email-002 in 200ms
[Worker 1] Completed job email-003 in 120ms
Implementation Hints:
The core queue structure:
function createJobQueue(options = {}) {
const jobs = []; // Pending jobs
const delayed = []; // Jobs with future execution time
const processing = new Map(); // Currently processing
const completed = []; // Finished jobs
const failed = []; // Failed jobs
const processors = new Map(); // Registered handlers
let jobIdCounter = 0;
let running = false;
function add(type, data, opts = {}) {
const job = {
id: `${type}-${++jobIdCounter}`,
type,
data,
priority: opts.priority || 0,
createdAt: Date.now(),
processAt: opts.delay ? Date.now() + opts.delay : Date.now(),
attempts: 0,
maxAttempts: opts.attempts || 3
};
if (opts.delay) {
delayed.push(job);
delayed.sort((a, b) => a.processAt - b.processAt);
} else {
insertByPriority(jobs, job);
}
return job;
}
// ...more methods
}
Priority insertion:
function insertByPriority(queue, job) {
// Higher priority = earlier in queue
const index = queue.findIndex(j => j.priority < job.priority);
if (index === -1) {
queue.push(job);
} else {
queue.splice(index, 0, job);
}
}
Worker loop:
async function workerLoop(workerId) {
while (running) {
// Move delayed jobs that are ready
moveReadyDelayedJobs();
const job = jobs.shift();
if (!job) {
await sleep(100); // Poll interval
continue;
}
const processor = processors.get(job.type);
if (!processor) {
failed.push({ ...job, error: 'No processor registered' });
continue;
}
processing.set(job.id, job);
job.startedAt = Date.now();
job.attempts++;
try {
job.result = await processor(job);
job.completedAt = Date.now();
completed.push(job);
} catch (err) {
if (job.attempts < job.maxAttempts) {
job.processAt = Date.now() + exponentialBackoff(job.attempts);
delayed.push(job);
} else {
job.error = err.message;
failed.push(job);
}
} finally {
processing.delete(job.id);
}
}
}
Learning milestones:
- Basic enqueue/dequeue works → You understand FIFO queues
- Priority jobs work → You understand priority queues
- Delayed jobs execute on time → You understand scheduled execution
- Multiple workers process concurrently → You understand competing consumers
Project 2: Redis-Based Job Queue (Build BullMQ from Scratch)
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: JavaScript (Node.js)
- Alternative Programming Languages: Python, Go, Ruby
- Coolness Level: Level 4: Hardcore Tech Flex
- Business Potential: 2. The “Micro-SaaS / Pro Tool”
- Difficulty: Level 3: Advanced
- Knowledge Area: Redis / Distributed Systems
- Software or Tool: Redis
- Main Book: “Redis in Action” by Josiah L. Carlson
What you’ll build: A distributed job queue backed by Redis, using BRPOP for blocking dequeue, Lua scripts for atomic operations, and sorted sets for delayed jobs—like BullMQ but understanding every piece.
Why it teaches job queues: Redis is the backbone of most job queues (Sidekiq, Bull, BullMQ, Resque). This project shows you why: atomic operations, blocking pops, sorted sets for scheduling, and pub/sub for events.
Core challenges you’ll face:
- Atomic job claiming with BRPOPLPUSH → maps to preventing double-processing
- Delayed jobs with sorted sets → maps to ZRANGEBYSCORE pattern
- Job locking to prevent duplicates → maps to distributed locking
- Reliable message acknowledgment → maps to visibility timeout pattern
- Worker heartbeats → maps to detecting dead workers
Key Concepts:
- Redis Data Structures: “Redis in Action” Chapters 1-3 - Josiah Carlson
- BRPOP/BLPOP: Redis Blocking Operations
- Lua Scripting in Redis: Redis Lua Scripts
- BullMQ Architecture: BullMQ Documentation
Difficulty: Advanced Time estimate: 1-2 weeks Prerequisites: Project 1, basic Redis knowledge
Real world outcome:
// Worker 1
const queue = createRedisQueue('emails', { redis: 'redis://localhost:6379' });
queue.process(async (job) => {
console.log(`[Worker ${process.pid}] Processing ${job.id}`);
await sendEmail(job.data);
return { sent: true, timestamp: Date.now() };
}, { concurrency: 5 });
// Worker 2 (separate process)
const queue2 = createRedisQueue('emails', { redis: 'redis://localhost:6379' });
queue2.process(async (job) => { /* same handler */ }, { concurrency: 5 });
// Producer (any process)
const producer = createRedisQueue('emails', { redis: 'redis://localhost:6379' });
await producer.add({ to: 'user@example.com', subject: 'Hello' });
await producer.add({ to: 'vip@example.com', subject: 'Important' }, { priority: 10 });
await producer.add({ to: 'later@example.com', subject: 'Tomorrow' }, { delay: 86400000 });
// Schedule recurring job
await producer.add({ type: 'daily-report' }, { repeat: { cron: '0 9 * * *' } });
# Terminal 1 - Worker
$ node worker.js
[Worker 12345] Connected to Redis
[Worker 12345] Processing email-1: user@example.com
[Worker 12345] Completed email-1 in 150ms
[Worker 12345] Processing email-3: later@example.com (was delayed)
# Terminal 2 - Worker
$ node worker.js
[Worker 12346] Connected to Redis
[Worker 12346] Processing email-2: vip@example.com (priority: 10)
[Worker 12346] Completed email-2 in 120ms
# Redis CLI
$ redis-cli
> KEYS bull:emails:*
1) "bull:emails:waiting"
2) "bull:emails:active"
3) "bull:emails:completed"
4) "bull:emails:failed"
5) "bull:emails:delayed"
Implementation Hints:
Redis key structure:
bull:{queueName}:waiting → List (FIFO queue)
bull:{queueName}:active → List (currently processing)
bull:{queueName}:delayed → Sorted Set (score = timestamp)
bull:{queueName}:completed → List (finished jobs)
bull:{queueName}:failed → List (failed jobs)
bull:{queueName}:job:{id} → Hash (job data)
bull:{queueName}:id → String (job ID counter)
Atomic job claiming with Lua:
-- KEYS[1] = waiting queue
-- KEYS[2] = active queue
-- KEYS[3] = job data hash prefix
-- ARGV[1] = worker ID
-- ARGV[2] = current timestamp
local jobId = redis.call('RPOPLPUSH', KEYS[1], KEYS[2])
if jobId then
local jobKey = KEYS[3] .. jobId
redis.call('HSET', jobKey, 'processedBy', ARGV[1], 'startedAt', ARGV[2])
local jobData = redis.call('HGETALL', jobKey)
return {jobId, jobData}
end
return nil
Moving delayed jobs (run periodically):
-- Move delayed jobs that are ready to waiting queue
local now = ARGV[1]
local jobs = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', now)
for i, jobId in ipairs(jobs) do
redis.call('ZREM', KEYS[1], jobId)
redis.call('LPUSH', KEYS[2], jobId)
end
return #jobs
Blocking dequeue with timeout:
async function getNextJob(timeout = 5) {
// BRPOPLPUSH blocks until job available or timeout
const result = await redis.brpoplpush(
`bull:${queueName}:waiting`,
`bull:${queueName}:active`,
timeout
);
if (!result) return null; // Timeout
const jobId = result;
const jobData = await redis.hgetall(`bull:${queueName}:job:${jobId}`);
return { id: jobId, ...parseJobData(jobData) };
}
Job acknowledgment (on completion):
async function completeJob(job, result) {
const multi = redis.multi();
// Remove from active
multi.lrem(`bull:${queueName}:active`, 1, job.id);
// Update job data
multi.hset(`bull:${queueName}:job:${job.id}`,
'completedAt', Date.now(),
'result', JSON.stringify(result)
);
// Add to completed (with optional trimming)
multi.lpush(`bull:${queueName}:completed`, job.id);
multi.ltrim(`bull:${queueName}:completed`, 0, 999); // Keep last 1000
await multi.exec();
}
Learning milestones:
- Jobs are claimed atomically → You understand distributed locking
- Workers across processes share work → You understand competing consumers
- Delayed jobs execute on time → You understand sorted sets
- Failed jobs retry with backoff → You understand reliability patterns
Project 3: Implement the AMQP Protocol (Mini-RabbitMQ Client)
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: JavaScript (Node.js)
- Alternative Programming Languages: Python, Go, Java
- Coolness Level: Level 5: Pure Magic (Super Cool)
- Business Potential: 1. The “Resume Gold”
- Difficulty: Level 4: Expert
- Knowledge Area: Network Protocols / Binary Parsing
- Software or Tool: RabbitMQ server
- Main Book: “RabbitMQ in Depth” by Gavin M. Roy
What you’ll build: An AMQP 0-9-1 client that speaks the wire protocol—connect to RabbitMQ, declare queues and exchanges, publish messages, and consume with acknowledgments.
Why it teaches job queues: AMQP is the protocol that powers RabbitMQ. Understanding it at the wire level shows you what libraries like amqplib abstract away: framing, channels, flow control, and the handshake protocol.
Core challenges you’ll face:
- Implementing the AMQP handshake → maps to protocol negotiation
- Parsing binary frames → maps to wire protocol understanding
- Managing channels → maps to multiplexing over TCP
- Handling flow control → maps to backpressure
- Implementing heartbeats → maps to connection health
Key Concepts:
- AMQP 0-9-1 Specification: RabbitMQ AMQP Concepts
- Binary Protocols: “High Performance Browser Networking” Chapter 2 - Ilya Grigorik
- AMQP Frame Format: AMQP Specification
- Channel Multiplexing: “RabbitMQ in Depth” Chapter 3 - Gavin Roy
Difficulty: Expert Time estimate: 2-3 weeks Prerequisites: Projects 1-2, TCP/binary data, buffer manipulation
Real world outcome:
const client = new AMQPClient('amqp://localhost:5672');
await client.connect();
console.log('Connected to RabbitMQ');
const channel = await client.createChannel();
// Declare exchange and queue
await channel.exchangeDeclare('logs', 'fanout', { durable: true });
await channel.queueDeclare('log-processor', { durable: true });
await channel.queueBind('log-processor', 'logs', '');
// Publish message
await channel.basicPublish('logs', '', Buffer.from(JSON.stringify({
level: 'info',
message: 'User logged in',
timestamp: Date.now()
})), {
contentType: 'application/json',
deliveryMode: 2 // Persistent
});
// Consume messages
await channel.basicConsume('log-processor', async (msg) => {
console.log('Received:', JSON.parse(msg.content.toString()));
await channel.basicAck(msg.deliveryTag);
}, { noAck: false });
$ node amqp-client.js
AMQP Handshake: Sent protocol header
AMQP Handshake: Received Connection.Start
AMQP Handshake: Sent Connection.StartOk
AMQP Handshake: Received Connection.Tune (channel_max=2047, frame_max=131072, heartbeat=60)
AMQP Handshake: Sent Connection.TuneOk
AMQP Handshake: Sent Connection.Open
AMQP Handshake: Received Connection.OpenOk
Connected to RabbitMQ
Channel 1: Sent Exchange.Declare
Channel 1: Received Exchange.DeclareOk
Channel 1: Sent Queue.Declare
Channel 1: Received Queue.DeclareOk (queue=log-processor, messages=0, consumers=0)
Publishing message...
Received: { level: 'info', message: 'User logged in', timestamp: 1705312200000 }
Implementation Hints:
AMQP frame structure (all frames):
+----------+----------+-----------+-----------+----------+
| Type (1) | Chan (2) | Size (4) | Payload | End (1) |
+----------+----------+-----------+-----------+----------+
0x01 0x0001 0x0000xx ... 0xCE
Frame types:
const FRAME_METHOD = 1;
const FRAME_HEADER = 2;
const FRAME_BODY = 3;
const FRAME_HEARTBEAT = 8;
Connection handshake sequence:
async function connect() {
// 1. Send protocol header
socket.write(Buffer.from('AMQP\x00\x00\x09\x01'));
// 2. Receive Connection.Start
const startFrame = await receiveFrame();
// Contains: version, server properties, mechanisms, locales
// 3. Send Connection.StartOk
await sendMethod(0, CONNECTION_START_OK, {
clientProperties: { product: 'MiniAMQP', version: '0.1' },
mechanism: 'PLAIN',
response: `\x00${username}\x00${password}`,
locale: 'en_US'
});
// 4. Receive Connection.Tune
const tuneFrame = await receiveFrame();
// Contains: channel_max, frame_max, heartbeat
// 5. Send Connection.TuneOk
await sendMethod(0, CONNECTION_TUNE_OK, {
channelMax: tuneFrame.channelMax,
frameMax: tuneFrame.frameMax,
heartbeat: tuneFrame.heartbeat
});
// 6. Send Connection.Open
await sendMethod(0, CONNECTION_OPEN, { virtualHost: '/' });
// 7. Receive Connection.OpenOk
await receiveFrame();
}
Parsing method frames:
function parseMethodFrame(buffer) {
const classId = buffer.readUInt16BE(0);
const methodId = buffer.readUInt16BE(2);
switch (classId) {
case 10: // Connection
return parseConnectionMethod(methodId, buffer.slice(4));
case 20: // Channel
return parseChannelMethod(methodId, buffer.slice(4));
case 40: // Exchange
return parseExchangeMethod(methodId, buffer.slice(4));
case 50: // Queue
return parseQueueMethod(methodId, buffer.slice(4));
case 60: // Basic
return parseBasicMethod(methodId, buffer.slice(4));
// ...
}
}
Publishing a message (requires 3 frames):
async function basicPublish(exchange, routingKey, body, properties = {}) {
// Frame 1: Method frame (Basic.Publish)
await sendMethod(channelId, BASIC_PUBLISH, {
exchange,
routingKey,
mandatory: false,
immediate: false
});
// Frame 2: Content header frame
await sendContentHeader(channelId, body.length, properties);
// Frame 3+: Content body frame(s)
// Split body if larger than frame_max
for (const chunk of splitBody(body, frameMax - 8)) {
await sendContentBody(channelId, chunk);
}
}
Learning milestones:
- Handshake completes successfully → You understand protocol negotiation
- Queue operations work → You understand AMQP methods
- Messages publish and consume → You understand the full flow
- Heartbeats keep connection alive → You understand connection management
Project 4: Build a Kafka-Style Event Log
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Java, C
- Coolness Level: Level 5: Pure Magic (Super Cool)
- Business Potential: 4. The “Open Core” Infrastructure
- Difficulty: Level 5: Master
- Knowledge Area: Distributed Systems / Storage Engines
- Software or Tool: None (build from scratch)
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A simplified Kafka-like commit log with topics, partitions, segment files, consumer groups with offset tracking, and replication—the core of event streaming.
Why it teaches job queues: Kafka isn’t really a queue—it’s a distributed commit log. Building one shows you why Kafka can replay messages, why partitions enable parallelism, and why it scales differently than RabbitMQ.
Core challenges you’ll face:
- Append-only log with segments → maps to Kafka’s storage model
- Partition assignment → maps to consumer group coordination
- Offset management → maps to consumer position tracking
- Leader election → maps to high availability
- Compacted topics → maps to log compaction
Key Concepts:
- Commit Logs: “Designing Data-Intensive Applications” Chapter 11 - Martin Kleppmann
- Kafka Architecture: Confluent Kafka Architecture Course
- Raft Consensus: Raft Paper
- Segment Files: Kafka Internals
Difficulty: Master Time estimate: 3-4 weeks Prerequisites: Projects 1-3, file I/O, networking
Real world outcome:
// Create a topic with 3 partitions
topic, err := broker.CreateTopic("user-events", TopicConfig{
Partitions: 3,
ReplicationFactor: 2,
RetentionMs: 7 * 24 * 60 * 60 * 1000, // 7 days
})
// Producer
producer := broker.NewProducer()
for i := 0; i < 1000; i++ {
event := Event{
Key: fmt.Sprintf("user-%d", i%100),
Value: []byte(fmt.Sprintf(`{"action":"click","userId":%d}`, i%100)),
}
// Key determines partition (consistent hashing)
offset, partition, err := producer.Send("user-events", event)
fmt.Printf("Sent to partition %d at offset %d\n", partition, offset)
}
// Consumer group
group := broker.NewConsumerGroup("analytics-service")
group.Subscribe("user-events")
for msg := range group.Messages() {
fmt.Printf("[P%d:O%d] %s\n", msg.Partition, msg.Offset, msg.Value)
group.Commit(msg) // Commit offset
}
$ ./minilog broker --data-dir=/tmp/minilog --port=9092
[Broker] Starting MiniLog broker on :9092
[Broker] Created topic 'user-events' with 3 partitions
$ ./minilog producer --topic=user-events
Sent 1000 messages (334 to P0, 333 to P1, 333 to P2)
$ ./minilog consumer --group=analytics --topic=user-events
[Consumer analytics-1] Assigned partitions: [0, 1]
[Consumer analytics-2] Assigned partitions: [2]
[P0:O0] {"action":"click","userId":0}
[P0:O1] {"action":"click","userId":3}
[P1:O0] {"action":"click","userId":1}
...
# Data files
$ ls /tmp/minilog/user-events-0/
00000000000000000000.log # Segment file
00000000000000000000.index # Offset index
00000000000000000000.timeindex # Timestamp index
Implementation Hints:
Segment file structure:
Log Segment (00000000000000000000.log):
+--------+--------+--------+--------+--------+--------+
| Offset | Size | CRC | Timestamp | Key | Value |
| (8B) | (4B) | (4B) | (8B) | (var) | (var) |
+--------+--------+--------+--------+--------+--------+
| 0 | 42 | 0xABC | 170531... | user-1| {...} |
| 1 | 38 | 0xDEF | 170531... | user-2| {...} |
| ... | | | | | |
+--------+--------+--------+--------+--------+--------+
Index File (00000000000000000000.index):
+--------+----------+
| Offset | Position |
| (4B) | (4B) |
+--------+----------+
| 0 | 0 | ← Offset 0 is at byte 0
| 100 | 4200 | ← Offset 100 is at byte 4200
| 200 | 8400 | ← Sparse index (every N offsets)
+--------+----------+
Partition structure:
type Partition struct {
ID int
TopicName string
DataDir string
Segments []*Segment
ActiveSegment *Segment
HighWatermark int64 // Last committed offset
LogEndOffset int64 // Next offset to write
mu sync.RWMutex
}
func (p *Partition) Append(key, value []byte) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()
// Check if we need a new segment
if p.ActiveSegment.Size() > maxSegmentSize {
if err := p.rollSegment(); err != nil {
return 0, err
}
}
offset := p.LogEndOffset
entry := &LogEntry{
Offset: offset,
Timestamp: time.Now().UnixMilli(),
Key: key,
Value: value,
}
if err := p.ActiveSegment.Append(entry); err != nil {
return 0, err
}
p.LogEndOffset++
return offset, nil
}
Consumer group coordination:
type ConsumerGroup struct {
Name string
Members map[string]*Consumer // memberId -> consumer
Assignments map[int]string // partitionId -> memberId
Offsets map[int]int64 // partitionId -> committed offset
Generation int // Incremented on rebalance
}
func (g *ConsumerGroup) Rebalance() {
g.Generation++
// Get all partitions for subscribed topics
partitions := g.getPartitions()
// Assign partitions to consumers (round-robin or range)
g.Assignments = make(map[int]string)
members := g.getMemberList()
for i, p := range partitions {
member := members[i % len(members)]
g.Assignments[p.ID] = member.ID
}
// Notify all consumers of new assignments
for _, member := range g.Members {
member.OnAssignment(g.getAssignment(member.ID), g.Generation)
}
}
Learning milestones:
- Append and read from log works → You understand commit logs
- Partitioning distributes data → You understand parallelism
- Consumer groups track offsets → You understand consumption model
- Rebalancing redistributes partitions → You understand coordination
Project 5: Dead Letter Queue with Retry Mechanisms
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: Python
- Alternative Programming Languages: JavaScript, Go, Java
- Coolness Level: Level 3: Genuinely Clever
- Business Potential: 3. The “Service & Support” Model
- Difficulty: Level 2: Intermediate
- Knowledge Area: Reliability Patterns / Error Handling
- Software or Tool: Redis or RabbitMQ
- Main Book: “Release It!” by Michael Nygard
What you’ll build: A robust retry system with exponential backoff, jitter, dead letter queues, and automatic poison pill detection—handling failures gracefully at scale.
Why it teaches job queues: Production job queues fail. Networks go down, services timeout, bugs crash workers. This project teaches you how to build systems that degrade gracefully and recover automatically.
Core challenges you’ll face:
- Exponential backoff with jitter → maps to preventing thundering herd
- Distinguishing transient vs permanent failures → maps to error classification
- Moving to dead letter queue → maps to isolating poison pills
- Replaying dead letters → maps to manual intervention patterns
- Circuit breaker integration → maps to protecting downstream services
Key Concepts:
- Exponential Backoff: AWS Architecture Blog
- Dead Letter Queues: AWS SQS DLQ
- Circuit Breaker: “Release It!” Chapter 5 - Michael Nygard
- Retry Patterns: Microsoft Retry Pattern
Difficulty: Intermediate Time estimate: 1 week Prerequisites: Project 1-2, error handling basics
Real world outcome:
from job_queue import Queue, RetryPolicy, DeadLetterQueue
queue = Queue('payment-processing', redis_url='redis://localhost:6379')
dlq = DeadLetterQueue('payment-processing-dlq', redis_url='redis://localhost:6379')
retry_policy = RetryPolicy(
max_attempts=5,
base_delay=1.0, # 1 second
max_delay=300.0, # 5 minutes
exponential_base=2, # 2^attempt
jitter=0.1, # ±10% randomness
retryable_exceptions=[TimeoutError, ConnectionError],
fatal_exceptions=[ValidationError, AuthenticationError]
)
@queue.worker(retry_policy=retry_policy, dlq=dlq)
async def process_payment(job):
"""Process a payment - may fail transiently."""
payment = job.data
# This might timeout or fail
result = await payment_gateway.charge(
amount=payment['amount'],
card_token=payment['card_token']
)
if result.status == 'declined':
raise ValidationError('Card declined') # Fatal - goes to DLQ immediately
return {'transaction_id': result.id}
# Add a job
await queue.add({
'amount': 99.99,
'card_token': 'tok_visa_4242',
'user_id': 123
})
# Start workers
await queue.start(concurrency=10)
$ python worker.py
[Worker] Processing job pay-001 (attempt 1/5)
[Worker] Job pay-001 failed: ConnectionError - retrying in 1.05s
[Worker] Processing job pay-001 (attempt 2/5)
[Worker] Job pay-001 failed: ConnectionError - retrying in 2.18s
[Worker] Processing job pay-001 (attempt 3/5)
[Worker] Job pay-001 completed: {'transaction_id': 'ch_abc123'}
[Worker] Processing job pay-002 (attempt 1/5)
[Worker] Job pay-002 FATAL: ValidationError('Card declined') - moving to DLQ
[DLQ] Received job pay-002 - reason: ValidationError('Card declined')
# Check DLQ
$ python dlq_admin.py list
pay-002: ValidationError('Card declined') at 2024-01-15T10:30:00Z
Original data: {'amount': 50.00, 'card_token': 'tok_expired', 'user_id': 456}
Attempts: 1
$ python dlq_admin.py replay pay-002 --queue=payment-processing
[DLQ] Replayed pay-002 to payment-processing
Implementation Hints:
Exponential backoff with jitter:
import random
def calculate_backoff(attempt: int, policy: RetryPolicy) -> float:
"""Calculate delay for next retry attempt."""
# Base exponential: base_delay * (exponential_base ^ attempt)
delay = policy.base_delay * (policy.exponential_base ** attempt)
# Cap at max delay
delay = min(delay, policy.max_delay)
# Add jitter (randomness to prevent thundering herd)
jitter_range = delay * policy.jitter
delay += random.uniform(-jitter_range, jitter_range)
return max(0, delay)
# Example: base=1, exp=2, attempts 1-5
# Attempt 1: ~1s (0.9-1.1)
# Attempt 2: ~2s (1.8-2.2)
# Attempt 3: ~4s (3.6-4.4)
# Attempt 4: ~8s (7.2-8.8)
# Attempt 5: ~16s (14.4-17.6)
Error classification:
def classify_error(error: Exception, policy: RetryPolicy) -> str:
"""Determine if error is retryable or fatal."""
error_type = type(error)
# Explicitly fatal
if any(isinstance(error, exc) for exc in policy.fatal_exceptions):
return 'fatal'
# Explicitly retryable
if any(isinstance(error, exc) for exc in policy.retryable_exceptions):
return 'retryable'
# HTTP status codes
if hasattr(error, 'status_code'):
if error.status_code in [429, 500, 502, 503, 504]:
return 'retryable'
if error.status_code in [400, 401, 403, 404]:
return 'fatal'
# Default: retryable (safer)
return 'retryable'
Worker retry loop:
async def process_with_retry(job, handler, policy, dlq):
attempt = 0
while attempt < policy.max_attempts:
attempt += 1
job.attempts = attempt
try:
result = await handler(job)
return result # Success!
except Exception as error:
classification = classify_error(error, policy)
if classification == 'fatal':
# Immediately move to DLQ
await dlq.add(job, reason=str(error))
raise FatalError(error)
if attempt >= policy.max_attempts:
# Exhausted retries - move to DLQ
await dlq.add(job, reason=f'Exhausted {attempt} attempts: {error}')
raise MaxRetriesExceeded(error)
# Calculate backoff and wait
delay = calculate_backoff(attempt, policy)
logger.warning(f'Job {job.id} failed (attempt {attempt}), retrying in {delay:.2f}s')
await asyncio.sleep(delay)
Dead Letter Queue structure:
class DeadLetterQueue:
def __init__(self, name, redis_url):
self.name = name
self.redis = Redis.from_url(redis_url)
async def add(self, job, reason):
dlq_entry = {
'original_job': job.to_dict(),
'reason': reason,
'failed_at': datetime.utcnow().isoformat(),
'attempts': job.attempts,
'source_queue': job.queue_name
}
await self.redis.lpush(f'dlq:{self.name}', json.dumps(dlq_entry))
async def replay(self, job_id, target_queue=None):
"""Move job back to original (or specified) queue for retry."""
entries = await self.redis.lrange(f'dlq:{self.name}', 0, -1)
for i, entry_json in enumerate(entries):
entry = json.loads(entry_json)
if entry['original_job']['id'] == job_id:
# Remove from DLQ
await self.redis.lrem(f'dlq:{self.name}', 1, entry_json)
# Add back to queue
queue_name = target_queue or entry['source_queue']
await self.redis.lpush(f'queue:{queue_name}', json.dumps(entry['original_job']))
return True
return False
Learning milestones:
- Transient failures retry automatically → You understand retry patterns
- Fatal errors go directly to DLQ → You understand error classification
- Backoff prevents overload → You understand exponential backoff
- DLQ entries can be replayed → You understand recovery patterns
Project 6: Distributed Worker Pool with Health Checks
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Java, Python
- Coolness Level: Level 4: Hardcore Tech Flex
- Business Potential: 3. The “Service & Support” Model
- Difficulty: Level 3: Advanced
- Knowledge Area: Distributed Systems / Monitoring
- Software or Tool: Redis + Custom coordination
- Main Book: “Designing Distributed Systems” by Brendan Burns
What you’ll build: A distributed worker system with heartbeats, automatic worker registration, stuck job detection, graceful shutdown, and horizontal scaling—like Celery/Sidekiq’s worker management.
Why it teaches job queues: Individual workers are easy. Managing a fleet of workers across multiple machines—knowing which are alive, redistributing work when one dies, scaling up and down—is the hard part.
Core challenges you’ll face:
- Worker registration and heartbeats → maps to service discovery
- Detecting dead workers → maps to failure detection
- Reclaiming orphaned jobs → maps to job recovery
- Graceful shutdown → maps to draining workers
- Auto-scaling based on queue depth → maps to elastic scaling
Key Concepts:
- Heartbeat Detection: “Designing Distributed Systems” Chapter 3 - Brendan Burns
- Service Discovery: “Building Microservices” Chapter 4 - Sam Newman
- Leader Election: Redis Redlock
- Graceful Shutdown: Go Graceful Shutdown
Difficulty: Advanced Time estimate: 1-2 weeks Prerequisites: Projects 1-2, multi-process/threading
Real world outcome:
// Start multiple workers
func main() {
pool := NewWorkerPool(WorkerConfig{
QueueName: "tasks",
RedisURL: "redis://localhost:6379",
Concurrency: 10,
HeartbeatInterval: 5 * time.Second,
StuckJobTimeout: 30 * time.Minute,
})
pool.Process("email", func(job *Job) error {
return sendEmail(job.Data)
})
pool.Process("image", func(job *Job) error {
return processImage(job.Data)
})
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go pool.Start()
<-sigChan
log.Println("Shutting down gracefully...")
pool.Shutdown(30 * time.Second) // Wait up to 30s for jobs to complete
}
# Terminal 1 - Worker
$ go run worker.go
[Worker abc123] Registered with pool
[Worker abc123] Heartbeat sent (load: 3/10)
[Worker abc123] Processing job email-001
[Worker abc123] Completed email-001 in 150ms
# Terminal 2 - Worker
$ go run worker.go
[Worker def456] Registered with pool
[Worker def456] Heartbeat sent (load: 5/10)
# Terminal 3 - Monitor
$ go run monitor.go
╔══════════════════════════════════════════════════════════════╗
║ Worker Pool Status ║
╠══════════════════════════════════════════════════════════════╣
║ Queue: tasks ║
║ Pending: 42 | Active: 8 | Completed: 1,234 | Failed: 12 ║
╠══════════════╦═════════╦════════╦════════════╦══════════════╣
║ Worker ║ Load ║ Status ║ Last Seen ║ Jobs Done ║
╠══════════════╬═════════╬════════╬════════════╬══════════════╣
║ abc123 ║ 3/10 ║ active ║ 2s ago ║ 456 ║
║ def456 ║ 5/10 ║ active ║ 1s ago ║ 321 ║
║ ghi789 ║ 0/10 ║ DEAD ║ 35s ago ║ 89 ║
╚══════════════╩═════════╩════════╩════════════╩══════════════╝
Recovering 2 stuck jobs from dead worker ghi789...
Implementation Hints:
Worker registration:
type Worker struct {
ID string
Hostname string
PID int
StartedAt time.Time
Concurrency int
ActiveJobs int
ProcessedJobs int64
LastHeartbeat time.Time
}
func (w *Worker) Register(redis *redis.Client) error {
w.ID = uuid.New().String()
w.Hostname, _ = os.Hostname()
w.PID = os.Getpid()
w.StartedAt = time.Now()
data, _ := json.Marshal(w)
return redis.HSet(ctx, "workers", w.ID, data).Err()
}
Heartbeat loop:
func (w *Worker) HeartbeatLoop(redis *redis.Client, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
w.LastHeartbeat = time.Now()
data, _ := json.Marshal(w)
redis.HSet(ctx, "workers", w.ID, data)
// Also set an expiring key for quick death detection
redis.Set(ctx, fmt.Sprintf("worker:%s:alive", w.ID), "1", interval*3)
case <-w.shutdownChan:
return
}
}
}
Dead worker detection and job recovery:
func (p *Pool) RecoverStuckJobs() {
// Get all registered workers
workers := p.redis.HGetAll(ctx, "workers").Val()
for id, data := range workers {
var w Worker
json.Unmarshal([]byte(data), &w)
// Check if worker is dead (no heartbeat)
alive := p.redis.Exists(ctx, fmt.Sprintf("worker:%s:alive", id)).Val()
if alive == 0 && time.Since(w.LastHeartbeat) > p.config.StuckJobTimeout {
log.Printf("Worker %s is dead, recovering jobs...", id)
// Get jobs that were processing on this worker
stuckJobs := p.redis.LRange(ctx, fmt.Sprintf("worker:%s:active", id), 0, -1).Val()
for _, jobData := range stuckJobs {
// Move back to pending queue
p.redis.LPush(ctx, fmt.Sprintf("queue:%s", p.config.QueueName), jobData)
log.Printf("Recovered job from dead worker")
}
// Clean up worker
p.redis.HDel(ctx, "workers", id)
p.redis.Del(ctx, fmt.Sprintf("worker:%s:active", id))
}
}
}
Graceful shutdown:
func (w *Worker) Shutdown(timeout time.Duration) error {
// Signal shutdown
close(w.shutdownChan)
// Stop accepting new jobs
w.accepting = false
// Wait for active jobs to complete
deadline := time.Now().Add(timeout)
for w.ActiveJobs > 0 && time.Now().Before(deadline) {
time.Sleep(100 * time.Millisecond)
}
if w.ActiveJobs > 0 {
log.Printf("Warning: %d jobs still active after timeout", w.ActiveJobs)
// Move active jobs back to queue
for _, job := range w.activeJobsList {
w.redis.LPush(ctx, fmt.Sprintf("queue:%s", job.QueueName), job.Data)
}
}
// Deregister
w.redis.HDel(ctx, "workers", w.ID)
return nil
}
Learning milestones:
- Workers register and send heartbeats → You understand service discovery
- Dead workers are detected → You understand failure detection
- Orphaned jobs are recovered → You understand job durability
- Shutdown is graceful → You understand production operations
Project 7: Priority Queue with Fair Scheduling
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: Python
- Alternative Programming Languages: Go, Java, TypeScript
- Coolness Level: Level 3: Genuinely Clever
- Business Potential: 2. The “Micro-SaaS / Pro Tool”
- Difficulty: Level 2: Intermediate
- Knowledge Area: Scheduling Algorithms / Data Structures
- Software or Tool: Redis (sorted sets)
- Main Book: “Operating System Concepts” by Silberschatz
What you’ll build: A priority queue system with multiple priority levels, weighted fair queuing to prevent starvation, and rate limiting per tenant—like a sophisticated task scheduler.
Why it teaches job queues: Not all jobs are equal. VIP customers need faster processing, but free-tier users shouldn’t starve. This project teaches scheduling fairness, a concept borrowed from OS design.
Core challenges you’ll face:
- Multi-level priority with no starvation → maps to weighted fair queuing
- Rate limiting per tenant → maps to token bucket algorithm
- Preemption for urgent jobs → maps to priority inversion
- Priority aging → maps to preventing indefinite postponement
Key Concepts:
- Priority Scheduling: “Operating System Concepts” Chapter 5 - Silberschatz
- Weighted Fair Queuing: Networking Concepts
- Token Bucket Algorithm: “Computer Networks” Chapter 5 - Tanenbaum
- Priority Inversion: Mars Pathfinder Bug
Difficulty: Intermediate Time estimate: 1 week Prerequisites: Projects 1-2, basic algorithm knowledge
Real world outcome:
from priority_queue import PriorityQueue, Priority, RateLimiter
queue = PriorityQueue(
redis_url='redis://localhost:6379',
priorities={
Priority.CRITICAL: {'weight': 100, 'max_age': 60}, # Process immediately
Priority.HIGH: {'weight': 50, 'max_age': 300}, # 5 min max wait
Priority.NORMAL: {'weight': 20, 'max_age': 3600}, # 1 hour max wait
Priority.LOW: {'weight': 5, 'max_age': 86400}, # 24 hour max wait
},
tenant_limits={
'enterprise': RateLimiter(1000, 'minute'),
'pro': RateLimiter(100, 'minute'),
'free': RateLimiter(10, 'minute'),
}
)
# Add jobs with different priorities
await queue.add('send_email', {'to': 'vip@corp.com'},
priority=Priority.CRITICAL, tenant='enterprise')
await queue.add('process_report', {'id': 123},
priority=Priority.NORMAL, tenant='pro')
await queue.add('sync_data', {'user': 456},
priority=Priority.LOW, tenant='free')
# Worker respects priorities and rate limits
@queue.worker
async def process(job):
print(f"Processing {job.type} (priority={job.priority}, tenant={job.tenant})")
await do_work(job)
$ python worker.py
[Scheduler] Queue status:
CRITICAL: 2 jobs (weight=100, effective=200)
HIGH: 5 jobs (weight=50, effective=250)
NORMAL: 20 jobs (weight=20, effective=400)
LOW: 100 jobs (weight=5, effective=500)
[Worker] Processing send_email (CRITICAL, enterprise)
[Worker] Processing send_email (CRITICAL, enterprise)
[Worker] Processing urgent_task (HIGH, pro)
[Worker] Processing process_report (NORMAL, pro)
[Worker] Processing process_report (NORMAL, pro)
[Worker] Processing sync_data (LOW, free) - aged 30min, priority boosted
[RateLimiter] Tenant 'free' throttled (10/10 used this minute)
Implementation Hints:
Priority queue structure with Redis sorted sets:
class PriorityQueue:
def __init__(self, redis_url, priorities, tenant_limits):
self.redis = Redis.from_url(redis_url)
self.priorities = priorities
self.tenant_limits = tenant_limits
async def add(self, job_type, data, priority, tenant):
job = {
'id': str(uuid.uuid4()),
'type': job_type,
'data': data,
'priority': priority.value,
'tenant': tenant,
'created_at': time.time()
}
# Store job data
await self.redis.hset(f'job:{job["id"]}', mapping=job)
# Add to priority queue (sorted set, lower score = higher priority)
score = self._calculate_score(priority, time.time())
await self.redis.zadd(f'queue:{priority.name}', {job['id']: score})
return job['id']
def _calculate_score(self, priority, timestamp):
# Lower score = dequeued first
# Combine priority weight with timestamp for FIFO within priority
return (1000 - self.priorities[priority]['weight']) * 1e10 + timestamp
Weighted fair queuing:
async def get_next_job(self):
"""Select next job using weighted fair queuing."""
now = time.time()
# Calculate effective weight for each priority level
weights = {}
for priority, config in self.priorities.items():
count = await self.redis.zcard(f'queue:{priority.name}')
if count > 0:
# Age-boost: increase weight for old jobs
oldest = await self.redis.zrange(f'queue:{priority.name}', 0, 0, withscores=True)
if oldest:
age = now - oldest[0][1] % 1e10
age_boost = min(age / config['max_age'], 1.0) * 50 # Up to 50 bonus weight
weights[priority] = config['weight'] + age_boost
if not weights:
return None
# Weighted random selection
total = sum(weights.values())
r = random.uniform(0, total)
cumulative = 0
for priority, weight in weights.items():
cumulative += weight
if r <= cumulative:
return await self._pop_from_queue(priority)
return None
async def _pop_from_queue(self, priority):
"""Atomically pop highest priority job from queue."""
# Use ZPOPMIN to get job with lowest score (highest priority + oldest)
result = await self.redis.zpopmin(f'queue:{priority.name}')
if not result:
return None
job_id, score = result[0]
job_data = await self.redis.hgetall(f'job:{job_id}')
return Job(**job_data)
Token bucket rate limiter:
class RateLimiter:
def __init__(self, limit, window):
self.limit = limit
self.window = window # 'second', 'minute', 'hour'
async def check(self, redis, tenant):
"""Check if tenant can proceed, returns (allowed, remaining)."""
key = f'ratelimit:{tenant}:{self._window_key()}'
# Increment counter
count = await redis.incr(key)
# Set expiry on first increment
if count == 1:
await redis.expire(key, self._window_seconds())
if count > self.limit:
return False, 0
return True, self.limit - count
def _window_key(self):
now = datetime.utcnow()
if self.window == 'second':
return now.strftime('%Y%m%d%H%M%S')
elif self.window == 'minute':
return now.strftime('%Y%m%d%H%M')
else:
return now.strftime('%Y%m%d%H')
Learning milestones:
- Priority levels work correctly → You understand priority queues
- Low-priority jobs don’t starve → You understand weighted fair queuing
- Rate limits are enforced → You understand token bucket
- Old jobs get priority boost → You understand aging
Project 8: Exactly-Once Processing with Idempotency
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Java, Python, Rust
- Coolness Level: Level 4: Hardcore Tech Flex
- Business Potential: 3. The “Service & Support” Model
- Difficulty: Level 4: Expert
- Knowledge Area: Distributed Transactions / Idempotency
- Software or Tool: Redis + PostgreSQL
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A job processing system that guarantees exactly-once semantics through idempotency keys, transactional outbox pattern, and deduplication—critical for financial transactions.
Why it teaches job queues: “Exactly-once” is the holy grail of message processing. This project shows you it’s technically impossible at the network level but achievable through idempotency at the application level.
Core challenges you’ll face:
- Idempotency key generation and storage → maps to preventing duplicate processing
- Transactional outbox pattern → maps to reliable message publishing
- Consumer deduplication → maps to handling redeliveries
- Atomic read-process-write → maps to distributed transactions
Key Concepts:
- Idempotency: “Designing Data-Intensive Applications” Chapter 11 - Kleppmann
- Outbox Pattern: Microservices.io Transactional Outbox
- Exactly-Once Semantics: Kafka Exactly-Once
- Two-Phase Commit: “Database Internals” Chapter 12 - Alex Petrov
Difficulty: Expert Time estimate: 2 weeks Prerequisites: Projects 1-5, database transactions
Real world outcome:
// Payment service with exactly-once guarantee
type PaymentService struct {
db *sql.DB
queue *Queue
processor *IdempotentProcessor
}
func (s *PaymentService) ProcessPayment(ctx context.Context, job *Job) error {
// Generate idempotency key from job data
idempotencyKey := fmt.Sprintf("payment:%s:%s", job.Data.OrderID, job.Data.Amount)
return s.processor.Execute(ctx, idempotencyKey, func(tx *sql.Tx) (*Result, error) {
// Check if already processed (within transaction)
existing, err := s.getExistingResult(tx, idempotencyKey)
if err != nil {
return nil, err
}
if existing != nil {
return existing, nil // Return cached result
}
// Process payment
result, err := s.chargeCard(tx, job.Data)
if err != nil {
return nil, err
}
// Store result for idempotency (within same transaction)
if err := s.storeResult(tx, idempotencyKey, result); err != nil {
return nil, err
}
// Publish event using outbox pattern (within same transaction)
if err := s.publishEvent(tx, "payment.completed", result); err != nil {
return nil, err
}
return result, nil
})
}
$ go run payment-worker.go
[Worker] Processing job pay-001 (idempotency_key=payment:order-123:99.99)
[Worker] Result: Transaction ch_abc123 created
[Outbox] Published event payment.completed
# Same job redelivered (network issue, worker crash, etc.)
[Worker] Processing job pay-001 (idempotency_key=payment:order-123:99.99)
[Worker] DUPLICATE DETECTED: Returning cached result
[Worker] Result: Transaction ch_abc123 (cached)
# No duplicate charge! No duplicate event!
# Verify in database
$ psql -c "SELECT * FROM idempotency_keys WHERE key = 'payment:order-123:99.99'"
key | result_json | created_at | expires_at
--------------------+----------------------+---------------------+-------------------
payment:order-123 | {"tx_id":"ch_abc123"}| 2024-01-15 10:30:00 | 2024-01-22 10:30:00
Implementation Hints:
Idempotent processor:
type IdempotentProcessor struct {
db *sql.DB
}
func (p *IdempotentProcessor) Execute(
ctx context.Context,
key string,
fn func(tx *sql.Tx) (*Result, error),
) (*Result, error) {
// Start transaction
tx, err := p.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return nil, err
}
defer tx.Rollback()
// Try to acquire lock on idempotency key
var existingResult sql.NullString
err = tx.QueryRowContext(ctx, `
SELECT result FROM idempotency_keys
WHERE key = $1
FOR UPDATE
`, key).Scan(&existingResult)
if err == nil && existingResult.Valid {
// Already processed, return cached result
var result Result
json.Unmarshal([]byte(existingResult.String), &result)
return &result, nil
}
if err != nil && err != sql.ErrNoRows {
return nil, err
}
// Not processed yet - execute the function
result, err := fn(tx)
if err != nil {
return nil, err
}
// Store result
resultJSON, _ := json.Marshal(result)
_, err = tx.ExecContext(ctx, `
INSERT INTO idempotency_keys (key, result, created_at, expires_at)
VALUES ($1, $2, NOW(), NOW() + INTERVAL '7 days')
ON CONFLICT (key) DO UPDATE SET result = $2
`, key, resultJSON)
if err != nil {
return nil, err
}
// Commit
if err := tx.Commit(); err != nil {
return nil, err
}
return result, nil
}
Transactional outbox pattern:
func (s *PaymentService) publishEvent(tx *sql.Tx, eventType string, data interface{}) error {
eventJSON, _ := json.Marshal(data)
// Insert into outbox table (within same transaction as business logic)
_, err := tx.Exec(`
INSERT INTO outbox (id, event_type, payload, created_at, published)
VALUES ($1, $2, $3, NOW(), false)
`, uuid.New().String(), eventType, eventJSON)
return err
}
// Separate process polls outbox and publishes to message broker
func (s *OutboxProcessor) Run() {
for {
events, _ := s.db.Query(`
SELECT id, event_type, payload
FROM outbox
WHERE published = false
ORDER BY created_at
LIMIT 100
`)
for events.Next() {
var id, eventType string
var payload []byte
events.Scan(&id, &eventType, &payload)
// Publish to message broker
err := s.broker.Publish(eventType, payload)
if err != nil {
continue // Retry later
}
// Mark as published
s.db.Exec(`UPDATE outbox SET published = true WHERE id = $1`, id)
}
time.Sleep(100 * time.Millisecond)
}
}
Consumer deduplication:
func (c *Consumer) HandleMessage(msg *Message) error {
// Check if we've seen this message before
seen, err := c.redis.SetNX(ctx, fmt.Sprintf("seen:%s", msg.ID), "1", 24*time.Hour).Result()
if err != nil {
return err
}
if !seen {
// Duplicate message - skip processing
log.Printf("Skipping duplicate message: %s", msg.ID)
return nil
}
// Process the message
return c.handler(msg)
}
Learning milestones:
- Duplicate jobs return same result → You understand idempotency
- Events publish atomically with data → You understand outbox pattern
- Redelivered messages don’t duplicate → You understand consumer deduplication
- Financial transactions are safe → You’ve achieved exactly-once
Project 9: Build a Cron-Like Job Scheduler
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: Python
- Alternative Programming Languages: Go, TypeScript, Java
- Coolness Level: Level 3: Genuinely Clever
- Business Potential: 2. The “Micro-SaaS / Pro Tool”
- Difficulty: Level 2: Intermediate
- Knowledge Area: Scheduling / Time-based Systems
- Software or Tool: Redis
- Main Book: “Operating System Concepts” by Silberschatz
What you’ll build: A distributed cron-like scheduler that handles recurring jobs with cron expressions, timezone support, missed job handling, and leader election to prevent duplicates—like Celery Beat.
Why it teaches job queues: Many jobs need to run on a schedule: daily reports, hourly syncs, weekly cleanups. This project shows how to schedule reliably across multiple servers without running duplicates.
Core challenges you’ll face:
- Parsing cron expressions → maps to time calculation
- Distributed leader election → maps to preventing duplicates
- Handling missed schedules → maps to catch-up processing
- Timezone-aware scheduling → maps to time complexity
Key Concepts:
- Cron Expression Parsing: Cron Expression Guide
- Leader Election: Redis Distributed Locks
- Temporal Scheduling: “Designing Data-Intensive Applications” Chapter 11 - Kleppmann
- Quartz Scheduler: Quartz Documentation
Difficulty: Intermediate Time estimate: 1 week Prerequisites: Projects 1-2, datetime handling
Real world outcome:
from scheduler import Scheduler, CronTrigger, IntervalTrigger
scheduler = Scheduler(redis_url='redis://localhost:6379')
# Cron-based scheduling
@scheduler.schedule(CronTrigger('0 9 * * MON-FRI', timezone='America/New_York'))
async def daily_report():
"""Generate daily report every weekday at 9 AM Eastern."""
await generate_report()
# Interval-based scheduling
@scheduler.schedule(IntervalTrigger(minutes=15))
async def sync_data():
"""Sync data every 15 minutes."""
await sync_external_api()
# One-time delayed execution
@scheduler.schedule(RunOnce(at=datetime(2024, 12, 25, 0, 0), timezone='UTC'))
async def christmas_notification():
"""Send Christmas notification."""
await send_notification("Merry Christmas!")
# Start scheduler (with leader election)
await scheduler.run()
$ python scheduler.py
[Scheduler] Instance sch-abc123 attempting to acquire leadership...
[Scheduler] Instance sch-abc123 is now LEADER
[Scheduler] Registered 3 scheduled jobs
[Scheduler] Next runs:
daily_report: 2024-01-15 09:00:00 EST (in 2h 30m)
sync_data: 2024-01-15 06:45:00 UTC (in 15m)
christmas_notify: 2024-12-25 00:00:00 UTC (in 345 days)
[06:45:00] Running: sync_data
[06:45:02] Completed: sync_data (2.1s) - next run at 07:00:00
[07:00:00] Running: sync_data
...
# Another instance (standby)
$ python scheduler.py
[Scheduler] Instance sch-def456 attempting to acquire leadership...
[Scheduler] Instance sch-def456 is STANDBY (leader: sch-abc123)
[Scheduler] Watching for leader failure...
Implementation Hints:
Cron expression parser:
from dataclasses import dataclass
from datetime import datetime, timedelta
import pytz
@dataclass
class CronTrigger:
expression: str # "0 9 * * MON-FRI"
timezone: str = 'UTC'
def __post_init__(self):
parts = self.expression.split()
self.minute = self._parse_field(parts[0], 0, 59)
self.hour = self._parse_field(parts[1], 0, 23)
self.day = self._parse_field(parts[2], 1, 31)
self.month = self._parse_field(parts[3], 1, 12)
self.dow = self._parse_dow(parts[4])
self.tz = pytz.timezone(self.timezone)
def _parse_field(self, field, min_val, max_val):
if field == '*':
return set(range(min_val, max_val + 1))
if '/' in field:
base, step = field.split('/')
start = min_val if base == '*' else int(base)
return set(range(start, max_val + 1, int(step)))
if '-' in field:
start, end = map(int, field.split('-'))
return set(range(start, end + 1))
if ',' in field:
return set(map(int, field.split(',')))
return {int(field)}
def _parse_dow(self, field):
dow_map = {'SUN': 0, 'MON': 1, 'TUE': 2, 'WED': 3, 'THU': 4, 'FRI': 5, 'SAT': 6}
for name, num in dow_map.items():
field = field.replace(name, str(num))
return self._parse_field(field, 0, 6)
def next_run(self, after: datetime = None) -> datetime:
"""Calculate next run time after given datetime."""
if after is None:
after = datetime.now(self.tz)
else:
after = after.astimezone(self.tz)
# Start from next minute
candidate = after.replace(second=0, microsecond=0) + timedelta(minutes=1)
# Find next matching time (max 2 years lookahead)
max_iterations = 60 * 24 * 365 * 2
for _ in range(max_iterations):
if (candidate.minute in self.minute and
candidate.hour in self.hour and
candidate.day in self.day and
candidate.month in self.month and
candidate.weekday() in self.dow):
return candidate
candidate += timedelta(minutes=1)
raise ValueError("No valid schedule found in next 2 years")
Leader election with Redis:
class LeaderElection:
def __init__(self, redis, key, ttl=30):
self.redis = redis
self.key = f'scheduler:leader:{key}'
self.ttl = ttl
self.instance_id = str(uuid.uuid4())
self._is_leader = False
async def acquire_leadership(self):
"""Try to become leader. Returns True if successful."""
# SET NX with expiry - atomic operation
acquired = await self.redis.set(
self.key,
self.instance_id,
nx=True, # Only set if not exists
ex=self.ttl
)
self._is_leader = acquired
return acquired
async def maintain_leadership(self):
"""Refresh leadership lock. Must be called periodically."""
if not self._is_leader:
return False
# Only refresh if we still hold the lock
current = await self.redis.get(self.key)
if current == self.instance_id:
await self.redis.expire(self.key, self.ttl)
return True
else:
self._is_leader = False
return False
async def run_as_leader(self, callback):
"""Run callback only if we're the leader."""
while True:
if not self._is_leader:
if await self.acquire_leadership():
logger.info(f"Instance {self.instance_id} is now LEADER")
else:
leader = await self.redis.get(self.key)
logger.info(f"Instance {self.instance_id} is STANDBY (leader: {leader})")
await asyncio.sleep(self.ttl / 3)
continue
# We're leader - run the callback
await callback()
# Refresh leadership
if not await self.maintain_leadership():
logger.warning("Lost leadership!")
continue
await asyncio.sleep(self.ttl / 3)
Learning milestones:
- Cron expressions calculate correctly → You understand time calculations
- Only one scheduler runs jobs → You understand leader election
- Timezone handling works → You understand temporal complexity
- Missed jobs are handled → You understand reliability
Project 10: Job Queue Dashboard with Real-Time Updates
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: TypeScript (Node.js + React)
- Alternative Programming Languages: Python + Vue, Go + HTMX
- Coolness Level: Level 3: Genuinely Clever
- Business Potential: 2. The “Micro-SaaS / Pro Tool”
- Difficulty: Level 2: Intermediate
- Knowledge Area: Full-Stack / Real-Time
- Software or Tool: BullMQ + Redis + WebSockets
- Main Book: “Building Real-Time Web Applications” (various resources)
What you’ll build: A real-time dashboard for monitoring job queues—like Flower for Celery or the Bull Dashboard—showing queue metrics, job details, and allowing manual intervention.
Why it teaches job queues: Observability is crucial for production systems. This project shows you what metrics matter, how to expose them, and how to build tools for debugging job queue issues.
Core challenges you’ll face:
- Aggregating queue metrics → maps to monitoring patterns
- Real-time updates with WebSockets → maps to live dashboards
- Job inspection and manipulation → maps to admin operations
- Historical data and charts → maps to observability
Key Concepts:
- Queue Metrics: BullMQ Dashboard
- WebSocket Real-Time: Socket.io Documentation
- Time-Series Data: Prometheus Metrics
- Redis Pub/Sub: Redis Pub/Sub
Difficulty: Intermediate Time estimate: 1-2 weeks Prerequisites: Projects 1-2, basic React/frontend
Real world outcome:
$ npm start
[Dashboard] Connecting to Redis at localhost:6379
[Dashboard] Found 3 queues: emails, images, reports
[Dashboard] WebSocket server listening on :3001
[Dashboard] HTTP server listening on :3000
# Open browser to http://localhost:3000
┌─────────────────────────────────────────────────────────────────────────┐
│ Job Queue Dashboard │
├─────────────────────────────────────────────────────────────────────────┤
│ Queues Overview 🟢 Connected │
├─────────────────┬───────────┬───────────┬───────────┬───────────────────┤
│ Queue │ Waiting │ Active │ Completed │ Failed │ Rate │
├─────────────────┼───────────┼───────────┼───────────┼───────────────────┤
│ 📧 emails │ 42 │ 5/10 │ 12,456 │ 23 │ 15/s │
│ 🖼️ images │ 156 │ 8/8 │ 8,901 │ 45 │ 8/s │
│ 📊 reports │ 3 │ 1/4 │ 234 │ 2 │ 0.5/s │
└─────────────────┴───────────┴───────────┴───────────┴───────────────────┘
│ Queue: emails │
├─────────────────────────────────────────────────────────────────────────┤
│ [Chart: Jobs processed over time - live updating] │
│ │
│ ▄▄█▄▄███████▄▄▄▄████████████▄▄▄█████▄ │
│ ▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀▀ │
│ 10:00 10:15 10:30 10:45 11:00 │
├─────────────────────────────────────────────────────────────────────────┤
│ Recent Jobs [Retry All Failed] │
├──────────────┬──────────┬────────────────────┬───────────┬──────────────┤
│ ID │ Status │ Data │ Duration │ Actions │
├──────────────┼──────────┼────────────────────┼───────────┼──────────────┤
│ email-15432 │ 🟢 done │ to: user@test.com │ 145ms │ [View] │
│ email-15431 │ 🔴 fail │ to: bad@email │ 23ms │ [Retry][Del] │
│ email-15430 │ 🟡 active│ to: vip@corp.com │ 2s... │ [View] │
│ email-15429 │ 🟢 done │ to: hello@world │ 89ms │ [View] │
└──────────────┴──────────┴────────────────────┴───────────┴──────────────┘
Implementation Hints:
Backend metrics API:
// metrics.ts
interface QueueMetrics {
name: string;
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
paused: boolean;
workers: number;
throughput: number; // jobs/second over last minute
}
async function getQueueMetrics(queue: Queue): Promise<QueueMetrics> {
const [waiting, active, completed, failed, delayed, workers] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getCompletedCount(),
queue.getFailedCount(),
queue.getDelayedCount(),
queue.getWorkers()
]);
// Calculate throughput from recent completions
const recentJobs = await queue.getCompleted(0, 60);
const oneMinuteAgo = Date.now() - 60000;
const throughput = recentJobs.filter(j => j.finishedOn > oneMinuteAgo).length / 60;
return {
name: queue.name,
waiting, active, completed, failed, delayed,
paused: await queue.isPaused(),
workers: workers.length,
throughput
};
}
Real-time WebSocket updates:
// server.ts
import { Server } from 'socket.io';
import { Queue, QueueEvents } from 'bullmq';
const io = new Server(server, { cors: { origin: '*' } });
const queues = new Map<string, Queue>();
const queueEvents = new Map<string, QueueEvents>();
// Initialize queue listeners
for (const queueName of ['emails', 'images', 'reports']) {
const queue = new Queue(queueName, { connection: redis });
const events = new QueueEvents(queueName, { connection: redis });
queues.set(queueName, queue);
queueEvents.set(queueName, events);
// Forward queue events to WebSocket clients
events.on('completed', ({ jobId, returnvalue }) => {
io.to(queueName).emit('job:completed', { jobId, result: returnvalue });
});
events.on('failed', ({ jobId, failedReason }) => {
io.to(queueName).emit('job:failed', { jobId, error: failedReason });
});
events.on('active', ({ jobId }) => {
io.to(queueName).emit('job:active', { jobId });
});
events.on('waiting', ({ jobId }) => {
io.to(queueName).emit('job:waiting', { jobId });
});
}
io.on('connection', (socket) => {
console.log('Client connected');
// Subscribe to queue updates
socket.on('subscribe', (queueName) => {
socket.join(queueName);
console.log(`Client subscribed to ${queueName}`);
});
// Send initial metrics
socket.on('getMetrics', async () => {
const metrics = await Promise.all(
Array.from(queues.values()).map(getQueueMetrics)
);
socket.emit('metrics', metrics);
});
// Admin actions
socket.on('retryJob', async ({ queueName, jobId }) => {
const queue = queues.get(queueName);
const job = await queue.getJob(jobId);
if (job) await job.retry();
});
socket.on('deleteJob', async ({ queueName, jobId }) => {
const queue = queues.get(queueName);
const job = await queue.getJob(jobId);
if (job) await job.remove();
});
});
// Periodic metrics broadcast
setInterval(async () => {
const metrics = await Promise.all(
Array.from(queues.values()).map(getQueueMetrics)
);
io.emit('metrics', metrics);
}, 1000);
React frontend with real-time updates:
// Dashboard.tsx
function Dashboard() {
const [metrics, setMetrics] = useState<QueueMetrics[]>([]);
const [jobs, setJobs] = useState<Map<string, Job[]>>(new Map());
useEffect(() => {
const socket = io('http://localhost:3001');
socket.on('connect', () => {
socket.emit('getMetrics');
socket.emit('subscribe', 'emails');
});
socket.on('metrics', (data) => {
setMetrics(data);
});
socket.on('job:completed', ({ jobId, result }) => {
updateJobStatus(jobId, 'completed', result);
});
socket.on('job:failed', ({ jobId, error }) => {
updateJobStatus(jobId, 'failed', error);
});
return () => socket.close();
}, []);
return (
<div className="dashboard">
<QueuesOverview metrics={metrics} />
<QueueDetail queue="emails" jobs={jobs.get('emails')} />
<ThroughputChart data={throughputHistory} />
</div>
);
}
Learning milestones:
- Queue metrics are accurate → You understand queue internals
- Updates appear in real-time → You understand pub/sub
- Admin actions work → You understand queue operations
- Charts show trends → You understand time-series data
Project Comparison Table
| Project | Difficulty | Time | Depth of Understanding | Fun Factor |
|---|---|---|---|---|
| 1. In-Memory Job Queue | Beginner | Weekend | Foundation | ★★★☆☆ |
| 2. Redis-Based Queue (BullMQ) | Advanced | 1-2 weeks | Deep | ★★★★☆ |
| 3. AMQP Protocol Client | Expert | 2-3 weeks | Protocol-level | ★★★★★ |
| 4. Kafka-Style Event Log | Master | 3-4 weeks | Distributed Systems | ★★★★★ |
| 5. Dead Letter Queue + Retry | Intermediate | 1 week | Reliability | ★★★★☆ |
| 6. Distributed Worker Pool | Advanced | 1-2 weeks | Operations | ★★★★☆ |
| 7. Priority Queue + Fair Scheduling | Intermediate | 1 week | Algorithms | ★★★☆☆ |
| 8. Exactly-Once Processing | Expert | 2 weeks | Transactions | ★★★★☆ |
| 9. Cron-Like Scheduler | Intermediate | 1 week | Time-based | ★★★☆☆ |
| 10. Real-Time Dashboard | Intermediate | 1-2 weeks | Full-Stack | ★★★★☆ |
Recommended Learning Path
Path A: Backend Developer (6-8 weeks)
For developers building production job processing systems:
- Week 1: Project 1 (In-Memory Queue) → Project 2 (Redis Queue)
- Week 2-3: Project 5 (DLQ + Retry) → Project 6 (Worker Pool)
- Week 4: Project 7 (Priority) → Project 9 (Scheduler)
- Week 5-6: Project 8 (Exactly-Once)
- Week 7-8: Project 10 (Dashboard)
Path B: Distributed Systems Engineer (8-10 weeks)
For developers who want deep protocol understanding:
- Week 1: Project 1 (Fundamentals)
- Week 2-4: Project 3 (AMQP Protocol)
- Week 5-8: Project 4 (Kafka-Style Log)
- Week 9: Project 8 (Exactly-Once)
- Week 10: Project 6 (Worker Pool)
Path C: DevOps/SRE (4-5 weeks)
For operators who need to run and monitor queues:
- Week 1: Project 2 (Redis Queue basics)
- Week 2: Project 5 (DLQ + Retry) → Project 6 (Worker Pool)
- Week 3: Project 9 (Scheduler)
- Week 4-5: Project 10 (Dashboard)
Final Capstone: Production-Ready Distributed Job Processing System
- File: LEARN_JOB_QUEUES_DEEP_DIVE.md
- Main Programming Language: Go
- Alternative Programming Languages: Rust, Java
- Coolness Level: Level 5: Pure Magic (Super Cool)
- Business Potential: 4. The “Open Core” Infrastructure
- Difficulty: Level 5: Master
- Knowledge Area: Distributed Systems
- Software or Tool: Custom implementation
- Main Book: “Designing Data-Intensive Applications” by Martin Kleppmann
What you’ll build: A complete, production-ready job processing system combining everything: Redis-backed queues with distributed workers, priority scheduling, exactly-once semantics, cron scheduling, dead letter queues, real-time dashboard, and horizontal scaling—your own Sidekiq/Celery.
Why it’s the capstone: This synthesizes every concept. You’ll understand not just how job queues work, but how to build infrastructure that processes millions of jobs reliably.
Components to integrate:
- Redis-backed priority queues with delayed jobs
- Distributed worker pool with heartbeats and auto-recovery
- Dead letter queues with exponential backoff retry
- Exactly-once processing with idempotency
- Cron-like scheduled jobs with leader election
- Real-time monitoring dashboard
- Prometheus metrics export
- Graceful shutdown and job draining
Real world outcome:
// Full system configuration
config := &JobSystemConfig{
Redis: "redis://localhost:6379",
Queues: []QueueConfig{
{Name: "critical", Priority: 100, Workers: 20},
{Name: "default", Priority: 50, Workers: 10},
{Name: "bulk", Priority: 10, Workers: 5},
},
Retry: RetryConfig{
MaxAttempts: 5,
ExponentialBase: 2,
MaxDelay: time.Hour,
},
Scheduler: SchedulerConfig{
Enabled: true,
LeaderLockTTL: 30 * time.Second,
},
Dashboard: DashboardConfig{
Enabled: true,
Port: 3000,
},
Metrics: MetricsConfig{
Enabled: true,
Port: 9090,
},
}
system, err := NewJobSystem(config)
if err != nil {
log.Fatal(err)
}
// Register job handlers
system.RegisterHandler("send_email", func(ctx context.Context, job *Job) error {
return emailService.Send(job.Data)
}, HandlerOptions{
Queue: "default",
Timeout: 30 * time.Second,
Idempotency: true,
})
system.RegisterHandler("process_payment", func(ctx context.Context, job *Job) error {
return paymentService.Process(job.Data)
}, HandlerOptions{
Queue: "critical",
Timeout: time.Minute,
Idempotency: true,
ExactlyOnce: true,
})
// Register scheduled jobs
system.Schedule("daily_report", "0 9 * * *", func(ctx context.Context) error {
return reportService.GenerateDaily()
})
// Start the system
system.Start()
Learning milestones:
- All components integrate smoothly → You understand system design
- Workers scale horizontally → You understand distribution
- Failures are handled gracefully → You understand reliability
- Metrics show system health → You understand observability
- Dashboard enables operations → You understand production needs
Essential Resources
Books
- “Designing Data-Intensive Applications” by Martin Kleppmann - The bible for distributed systems
- “RabbitMQ in Depth” by Gavin M. Roy - Deep dive into AMQP and RabbitMQ
- “Kafka: The Definitive Guide” by Neha Narkhede - Official Kafka book
- “Release It!” by Michael Nygard - Reliability patterns
- “Redis in Action” by Josiah Carlson - Redis patterns and use cases
Online Resources
- Kafka Architecture Course - Free Confluent course
- RabbitMQ AMQP Concepts - Official tutorial
- BullMQ Documentation - Modern Redis queue
- AWS SQS Best Practices
- Building a Task Queue from Scratch - Excellent tutorial series
Articles
- At most once, at least once, exactly once - ByteByteGo
- Kafka vs RabbitMQ - AWS comparison
- Exponential Backoff and Jitter - AWS Architecture
Summary
| # | Project | Main Language | Knowledge Area |
|---|---|---|---|
| 1 | Simple In-Memory Job Queue | JavaScript (Node.js) | Data Structures / Concurrency |
| 2 | Redis-Based Job Queue | JavaScript (Node.js) | Redis / Distributed Systems |
| 3 | AMQP Protocol Client | JavaScript (Node.js) | Network Protocols / Binary Parsing |
| 4 | Kafka-Style Event Log | Go | Distributed Systems / Storage Engines |
| 5 | Dead Letter Queue + Retry | Python | Reliability Patterns / Error Handling |
| 6 | Distributed Worker Pool | Go | Distributed Systems / Monitoring |
| 7 | Priority Queue with Fair Scheduling | Python | Scheduling Algorithms / Data Structures |
| 8 | Exactly-Once Processing | Go | Distributed Transactions / Idempotency |
| 9 | Cron-Like Job Scheduler | Python | Scheduling / Time-based Systems |
| 10 | Real-Time Dashboard | TypeScript | Full-Stack / Real-Time |
| Capstone | Production Job Processing System | Go | Complete Distributed Systems |
What You’ll Understand After These Projects
- Queue Fundamentals: FIFO, priority, delayed, and recurring jobs
- Message Brokers: RabbitMQ’s AMQP vs Kafka’s commit log
- Delivery Semantics: At-most-once, at-least-once, exactly-once tradeoffs
- Reliability Patterns: Retry with backoff, dead letter queues, idempotency
- Distributed Workers: Heartbeats, leader election, graceful shutdown
- Production Operations: Monitoring, dashboards, scaling
You won’t just use job queues anymore—you’ll understand them. And understanding means you can debug issues, design resilient systems, and choose the right tool for every scenario.
Happy building! Message queues are the backbone of modern distributed systems.