Project 26: Multi-Session Orchestrator - Parallel Claude Instances

Project 26: Multi-Session Orchestrator - Parallel Claude Instances

Build an orchestrator that runs multiple Claude instances in parallel: analyze different parts of a codebase concurrently, aggregate results, manage session IDs for resume/continue, and handle failures with retries.

Quick Reference

Attribute Value
Difficulty Expert
Time Estimate 2-3 weeks
Language Python (Alternatives: TypeScript, Go, Rust)
Prerequisites Projects 24-25 completed, concurrency experience
Key Topics Session management, asyncio, parallel processing, scatter-gather, retry logic
Main Book โ€œConcurrency in Pythonโ€ by Matthew Fowler

1. Learning Objectives

By completing this project, you will:

  1. Master session management: Use --resume and --continue for resumable workflows
  2. Implement parallel execution: Run multiple Claude instances concurrently with asyncio
  3. Apply scatter-gather patterns: Divide work, process in parallel, aggregate results
  4. Handle partial failures: Implement retry logic and graceful degradation
  5. Design work partitioning: Divide codebases intelligently for parallel analysis
  6. Control concurrency limits: Use semaphores to respect API rate limits
  7. Build checkpoint systems: Enable interrupted jobs to resume from last checkpoint

2. Real World Outcome

When complete, youโ€™ll have an orchestrator that dramatically speeds up large-scale analysis:

$ python orchestrator.py --analyze ./large-codebase --workers 5

Starting parallel analysis with 5 workers...

Worker 1: Analyzing src/auth/* (12 files)
Worker 2: Analyzing src/api/* (18 files)
Worker 3: Analyzing src/utils/* (8 files)
Worker 4: Analyzing src/components/* (34 files)
Worker 5: Analyzing src/services/* (15 files)

Progress:
[Worker 1] ---------------------------------------- 100%
[Worker 2] --------------------------              65%
[Worker 3] ---------------------------------------- 100%
[Worker 4] ----------------                        40%
[Worker 5] ----------------------------            70%

Aggregating results...

Analysis Complete
------------------------------
- Total files: 87
- Time: 45s (vs 3m 45s sequential)
- Sessions used: 5
- Issues found: 23

Session IDs saved for resume:
- auth: session_abc123
- api: session_def456
- utils: session_ghi789
- components: session_jkl012
- services: session_mno345

Performance Comparison

Approach Files Time Cost
Sequential 87 3m 45s $0.45
Parallel (5 workers) 87 45s $0.45

Same cost, 5x faster.


3. The Core Question Youโ€™re Answering

โ€œHow do I run multiple Claude instances in parallel to speed up large-scale analysis while managing sessions for resumability?โ€

Large codebases need parallel processing. Sequential analysis of 100+ files takes too long for practical use. This project teaches you to orchestrate multiple Claude processes, manage their sessions, and aggregate their results.

Why This Matters

Consider a typical enterprise codebase:

  • 500+ files across 50+ directories
  • Sequential analysis: 30+ minutes
  • Parallel analysis (10 workers): 3-5 minutes

The speedup comes from I/O parallelism - while one Claude instance waits for API response, others are processing.


4. Concepts You Must Understand First

Stop and research these before coding:

4.1 Session Management

Claude Code maintains sessions that enable continuity:

# Initial run - get a session ID
claude -p "Analyze files" --output-format json
# Output includes: {"session_id": "abc123", ...}

# Resume the same session later
claude -p "What did you find?" --resume abc123

# Continue the most recent session
claude -p "Show me the critical issues" --continue

Key Questions:

  • What state does a session preserve?
  • When should you use --resume vs --continue?
  • How long do sessions persist?
  • Whatโ€™s the session_id format?

Reference: Claude Code documentation - โ€œSessionsโ€

4.2 Parallel Execution in Python

Two main approaches:

# asyncio - for I/O-bound work (API calls)
import asyncio

async def analyze(files):
    proc = await asyncio.create_subprocess_exec(...)
    stdout, _ = await proc.communicate()
    return stdout

# Run many concurrently
results = await asyncio.gather(*[analyze(chunk) for chunk in chunks])
# multiprocessing - for CPU-bound work
from multiprocessing import Pool

def analyze(files):
    result = subprocess.run([...])
    return result.stdout

with Pool(5) as p:
    results = p.map(analyze, chunks)

For Claude (I/O-bound), asyncio is typically better.

Reference: โ€œConcurrency in Pythonโ€ by Fowler, Chapters 4-6

4.3 Scatter-Gather Pattern

From โ€œEnterprise Integration Patternsโ€:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    SCATTER-GATHER PATTERN                        โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                         โ”‚
โ”‚                    โ”‚    SPLITTER      โ”‚                         โ”‚
โ”‚                    โ”‚  Divide work     โ”‚                         โ”‚
โ”‚                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                         โ”‚
โ”‚                             โ”‚                                    โ”‚
โ”‚            โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                  โ”‚
โ”‚            โ”‚                โ”‚                โ”‚                  โ”‚
โ”‚            โ–ผ                โ–ผ                โ–ผ                  โ”‚
โ”‚     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”             โ”‚
โ”‚     โ”‚ Worker 1 โ”‚     โ”‚ Worker 2 โ”‚     โ”‚ Worker N โ”‚             โ”‚
โ”‚     โ”‚ (Claude) โ”‚     โ”‚ (Claude) โ”‚     โ”‚ (Claude) โ”‚             โ”‚
โ”‚     โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜     โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜             โ”‚
โ”‚          โ”‚                โ”‚                โ”‚                    โ”‚
โ”‚          โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                    โ”‚
โ”‚                           โ”‚                                      โ”‚
โ”‚                           โ–ผ                                      โ”‚
โ”‚                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                         โ”‚
โ”‚                    โ”‚   AGGREGATOR     โ”‚                         โ”‚
โ”‚                    โ”‚  Combine results โ”‚                         โ”‚
โ”‚                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                         โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Key insight: Each worker is independent, so failures donโ€™t cascade.


5. Questions to Guide Your Design

5.1 Work Division Strategies

Strategy Pros Cons Use When
By directory Logical grouping Uneven sizes Clear module boundaries
By file type Consistent processing May split related files Different analysis per type
By file size Even workload Ignores relationships Raw throughput matters
Equal chunks Balanced load May split modules No natural grouping

5.2 Concurrency Control

# Too many workers = rate limit errors
workers = 100  # BAD: API will reject

# Too few = slow
workers = 1    # BAD: No parallelism

# Find the sweet spot
workers = 5    # Good: Parallel but within limits

Questions:

  • Whatโ€™s Claudeโ€™s rate limit?
  • How do you implement backoff?
  • Should workers be dynamic?

5.3 Error Recovery

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    ERROR RECOVERY MATRIX                         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Error Type              โ”‚ Action                               โ”‚
โ”‚  โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€   โ”‚
โ”‚  Rate limit (429)        โ”‚ Exponential backoff, retry           โ”‚
โ”‚  Timeout                 โ”‚ Save checkpoint, retry once          โ”‚
โ”‚  Parse error             โ”‚ Log, skip file, continue             โ”‚
โ”‚  Worker crash            โ”‚ Restart worker, resume session       โ”‚
โ”‚  API unavailable         โ”‚ Wait, retry all pending              โ”‚
โ”‚  Partial results         โ”‚ Save what we have, mark incomplete   โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

6. Thinking Exercise

Design the Orchestrator

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                        ORCHESTRATOR                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Input: Large codebase                                           โ”‚
โ”‚       โ”‚                                                          โ”‚
โ”‚       โ–ผ                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                            โ”‚
โ”‚  โ”‚ PARTITION       โ”‚ Split files into chunks                    โ”‚
โ”‚  โ”‚ Strategy:       โ”‚ - By directory                             โ”‚
โ”‚  โ”‚   directory     โ”‚ - 5-20 files per chunk                     โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                            โ”‚
โ”‚           โ”‚                                                      โ”‚
โ”‚           โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”              โ”‚
โ”‚           โ”‚                                      โ”‚              โ”‚
โ”‚           โ–ผ                                      โ–ผ              โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”‚
โ”‚  โ”‚   WORKER 1     โ”‚  โ”‚   WORKER 2     โ”‚  โ”‚   WORKER N     โ”‚    โ”‚
โ”‚  โ”‚   claude -p    โ”‚  โ”‚   claude -p    โ”‚  โ”‚   claude -p    โ”‚    โ”‚
โ”‚  โ”‚   session_1    โ”‚  โ”‚   session_2    โ”‚  โ”‚   session_n    โ”‚    โ”‚
โ”‚  โ”‚                โ”‚  โ”‚                โ”‚  โ”‚                โ”‚    โ”‚
โ”‚  โ”‚  [semaphore]   โ”‚  โ”‚  [semaphore]   โ”‚  โ”‚  [semaphore]   โ”‚    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ”‚
โ”‚           โ”‚                   โ”‚                   โ”‚             โ”‚
โ”‚           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜             โ”‚
โ”‚                               โ”‚                                  โ”‚
โ”‚                               โ–ผ                                  โ”‚
โ”‚                    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                          โ”‚
โ”‚                    โ”‚   AGGREGATOR    โ”‚                          โ”‚
โ”‚                    โ”‚ - Combine issuesโ”‚                          โ”‚
โ”‚                    โ”‚ - Deduplicate   โ”‚                          โ”‚
โ”‚                    โ”‚ - Sort by file  โ”‚                          โ”‚
โ”‚                    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                          โ”‚
โ”‚                                                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”โ”‚
โ”‚  โ”‚ CHECKPOINT STORE                                             โ”‚โ”‚
โ”‚  โ”‚ - Session IDs per chunk                                      โ”‚โ”‚
โ”‚  โ”‚ - Processed files                                            โ”‚โ”‚
โ”‚  โ”‚ - Partial results                                            โ”‚โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Design Questions

  1. How do you handle one worker failing?
    • Other workers continue
    • Failed worker retries with backoff
    • After max retries, save checkpoint for manual resume
  2. How do you track which session analyzed which files?
    • Maintain a mapping: {chunk_name: session_id}
    • Save to JSON file for resume capability
    • Include file list in each sessionโ€™s context
  3. What if you need to re-run a specific worker?
    • Use saved session_id with --resume
    • Or start fresh with new session
    • Depends on whether context is still needed

7. The Interview Questions Theyโ€™ll Ask

7.1 โ€œHow would you parallelize AI workloads while respecting rate limits?โ€

Good answer structure:

  1. Use asyncio for I/O-bound parallelism
  2. Limit concurrent requests with Semaphore
  3. Implement exponential backoff for 429 errors
  4. Track usage to stay within budget
  5. Queue excess work for later processing
async def analyze_with_limit(semaphore, files):
    async with semaphore:  # Only N concurrent
        return await run_claude(files)

7.2 โ€œWhatโ€™s the scatter-gather pattern and when would you use it?โ€

Key points:

  • Scatter: Split work into independent chunks
  • Process: Run chunks in parallel
  • Gather: Aggregate results

Use when:

  • Work is divisible into independent pieces
  • Processing time dominates
  • Order doesnโ€™t matter
  • Partial failures are acceptable

7.3 โ€œHow do you handle partial failures in parallel processing?โ€

Strategies:

  • Isolate failures to individual workers
  • Save partial results before failing
  • Implement checkpoints for resume
  • Retry with exponential backoff
  • Fallback to sequential for problematic chunks

7.4 โ€œHow would you implement resumable parallel jobs?โ€

Implementation:

  1. Save session_id for each worker
  2. Track completed chunks in checkpoint file
  3. On resume, skip completed chunks
  4. Use --resume session_id for incomplete chunks
  5. Re-aggregate all results (new + cached)

7.5 โ€œWhat are the trade-offs of parallelism vs sequential processing?โ€

Aspect Sequential Parallel
Complexity Simple Complex
Speed Slow Fast
Debugging Easy Hard
Cost Same Same
Failure handling Stop on first Isolated
Memory Low Higher
Rate limits No issues Must manage

8. Hints in Layers

Hint 1: Use asyncio.Semaphore

Control concurrent processes:

async def main():
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent

    async def limited_analyze(files):
        async with semaphore:
            return await analyze(files)

    tasks = [limited_analyze(chunk) for chunk in chunks]
    results = await asyncio.gather(*tasks)

Hint 2: Track Session IDs

Save for resume capability:

sessions = {}

async def analyze_chunk(name, files):
    result = await run_claude(files)
    session_id = result.get("session_id")
    sessions[name] = session_id  # Save for later
    return result

# Save sessions to file
with open("sessions.json", "w") as f:
    json.dump(sessions, f)

Hint 3: Use asyncio.gather

Run all workers concurrently:

async def run_all():
    tasks = [
        analyze_chunk("auth", auth_files),
        analyze_chunk("api", api_files),
        analyze_chunk("utils", util_files),
    ]

    # Run concurrently, wait for all
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Check for failures
    for name, result in zip(["auth", "api", "utils"], results):
        if isinstance(result, Exception):
            print(f"Worker {name} failed: {result}")
        else:
            print(f"Worker {name} complete: {len(result['issues'])} issues")

Hint 4: Implement Retries

Use exponential backoff:

async def analyze_with_retry(files, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await run_claude(files)
        except RateLimitError:
            wait_time = 2 ** attempt  # 1, 2, 4 seconds
            print(f"Rate limited, waiting {wait_time}s...")
            await asyncio.sleep(wait_time)
    raise Exception("Max retries exceeded")

9. Books That Will Help

Topic Book Chapter/Section
Python concurrency โ€œConcurrency in Pythonโ€ by Fowler Ch. 4: asyncio Fundamentals
Python concurrency โ€œConcurrency in Pythonโ€ by Fowler Ch. 5: Non-blocking I/O
Python concurrency โ€œConcurrency in Pythonโ€ by Fowler Ch. 6: Handling CPU-bound work
Integration patterns โ€œEnterprise Integration Patternsโ€ by Hohpe Ch. 8: Message Routing
Distributed systems โ€œDesigning Data-Intensive Applicationsโ€ by Kleppmann Ch. 8: Distributed Systems
Reliable systems โ€œRelease It!โ€ by Nygard Ch. 4-5: Stability patterns

10. Implementation Guide

10.1 Complete Orchestrator

import asyncio
import subprocess
import json
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class ChunkResult:
    name: str
    session_id: str
    files_analyzed: int
    issues: list
    error: Optional[str] = None

@dataclass
class OrchestratorState:
    sessions: dict = field(default_factory=dict)
    results: list = field(default_factory=list)
    completed: set = field(default_factory=set)

class ClaudeOrchestrator:
    def __init__(self, max_workers: int = 5, checkpoint_file: str = "checkpoint.json"):
        self.semaphore = asyncio.Semaphore(max_workers)
        self.checkpoint_file = checkpoint_file
        self.state = OrchestratorState()
        self._load_checkpoint()

    def _load_checkpoint(self):
        """Load previous state for resume."""
        try:
            with open(self.checkpoint_file) as f:
                data = json.load(f)
                self.state.sessions = data.get("sessions", {})
                self.state.completed = set(data.get("completed", []))
                print(f"Loaded checkpoint: {len(self.state.completed)} chunks complete")
        except FileNotFoundError:
            pass

    def _save_checkpoint(self):
        """Save state for resume."""
        with open(self.checkpoint_file, "w") as f:
            json.dump({
                "sessions": self.state.sessions,
                "completed": list(self.state.completed)
            }, f, indent=2)

    async def analyze_chunk(self, name: str, files: list[str]) -> ChunkResult:
        """Analyze a chunk of files with rate limiting."""
        async with self.semaphore:
            print(f"[{name}] Starting analysis of {len(files)} files...")

            prompt = f"""Analyze these files for issues:
            {' '.join(files)}

            Report:
            1. Security vulnerabilities
            2. Bug risks
            3. Code quality issues

            Format: JSON with 'issues' array."""

            try:
                proc = await asyncio.create_subprocess_exec(
                    "claude", "-p", prompt,
                    "--output-format", "json",
                    "--max-turns", "5",
                    stdout=asyncio.subprocess.PIPE,
                    stderr=asyncio.subprocess.PIPE
                )

                stdout, stderr = await asyncio.wait_for(
                    proc.communicate(),
                    timeout=120  # 2 minute timeout per chunk
                )

                result = json.loads(stdout.decode())
                session_id = result.get("session_id", "unknown")

                # Save session for resume
                self.state.sessions[name] = session_id
                self.state.completed.add(name)
                self._save_checkpoint()

                print(f"[{name}] Complete! Session: {session_id[:8]}...")

                return ChunkResult(
                    name=name,
                    session_id=session_id,
                    files_analyzed=len(files),
                    issues=self._extract_issues(result)
                )

            except asyncio.TimeoutError:
                return ChunkResult(
                    name=name,
                    session_id="",
                    files_analyzed=0,
                    issues=[],
                    error="Timeout"
                )
            except Exception as e:
                return ChunkResult(
                    name=name,
                    session_id="",
                    files_analyzed=0,
                    issues=[],
                    error=str(e)
                )

    def _extract_issues(self, result: dict) -> list:
        """Extract issues from Claude's response."""
        text = result.get("result", "")
        # Simple extraction - in practice, use JSON schema
        issues = []
        for line in text.split("\n"):
            if any(kw in line.lower() for kw in ["issue", "vulnerability", "bug"]):
                issues.append(line.strip())
        return issues

    def partition_by_directory(self, base_path: Path) -> dict[str, list[str]]:
        """Partition files by directory."""
        chunks = {}

        for subdir in base_path.iterdir():
            if not subdir.is_dir():
                continue
            if subdir.name.startswith("."):
                continue

            files = [str(f) for f in subdir.glob("**/*")
                    if f.is_file() and f.suffix in [".py", ".ts", ".js", ".go"]]

            if files:
                chunks[subdir.name] = files

        return chunks

    async def run(self, base_path: Path) -> dict:
        """Run the full orchestration."""
        print(f"\nPartitioning {base_path}...")
        chunks = self.partition_by_directory(base_path)

        # Skip already completed chunks
        pending = {k: v for k, v in chunks.items()
                   if k not in self.state.completed}

        print(f"Found {len(chunks)} directories, {len(pending)} pending\n")

        if not pending:
            print("All chunks complete! Use cached results.")
            return self._aggregate_cached()

        # Run pending chunks in parallel
        tasks = [
            self.analyze_chunk(name, files)
            for name, files in pending.items()
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Process results
        for result in results:
            if isinstance(result, Exception):
                print(f"Task failed: {result}")
            elif result.error:
                print(f"[{result.name}] Error: {result.error}")
            else:
                self.state.results.append(result)

        return self._aggregate()

    def _aggregate(self) -> dict:
        """Aggregate all results."""
        total_files = sum(r.files_analyzed for r in self.state.results)
        all_issues = []
        for r in self.state.results:
            for issue in r.issues:
                all_issues.append({"chunk": r.name, "issue": issue})

        return {
            "total_files": total_files,
            "total_issues": len(all_issues),
            "issues": all_issues,
            "sessions": self.state.sessions
        }

    def _aggregate_cached(self) -> dict:
        """Aggregate from cached results."""
        # In practice, you'd load cached results
        return {
            "from_cache": True,
            "sessions": self.state.sessions,
            "completed_chunks": list(self.state.completed)
        }


async def main():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--analyze", required=True, help="Directory to analyze")
    parser.add_argument("--workers", type=int, default=5, help="Max concurrent workers")
    args = parser.parse_args()

    orchestrator = ClaudeOrchestrator(max_workers=args.workers)
    results = await orchestrator.run(Path(args.analyze))

    print("\n" + "=" * 50)
    print("RESULTS")
    print("=" * 50)
    print(json.dumps(results, indent=2))


if __name__ == "__main__":
    asyncio.run(main())

10.2 Resume Capability

class ResumableOrchestrator(ClaudeOrchestrator):
    async def resume_chunk(self, name: str, additional_prompt: str) -> ChunkResult:
        """Resume a specific chunk with additional context."""
        session_id = self.state.sessions.get(name)
        if not session_id:
            raise ValueError(f"No session found for {name}")

        async with self.semaphore:
            proc = await asyncio.create_subprocess_exec(
                "claude", "-p", additional_prompt,
                "--resume", session_id,
                "--output-format", "json",
                stdout=asyncio.subprocess.PIPE
            )

            stdout, _ = await proc.communicate()
            result = json.loads(stdout.decode())

            return ChunkResult(
                name=name,
                session_id=session_id,
                files_analyzed=0,
                issues=self._extract_issues(result)
            )

10.3 Progress Display

import sys
from datetime import datetime

class ProgressOrchestrator(ClaudeOrchestrator):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_time = None
        self.progress = {}

    async def analyze_chunk(self, name: str, files: list[str]) -> ChunkResult:
        self.progress[name] = {"status": "running", "start": datetime.now()}
        self._update_display()

        result = await super().analyze_chunk(name, files)

        self.progress[name]["status"] = "done" if not result.error else "error"
        self._update_display()

        return result

    def _update_display(self):
        """Update progress display."""
        # Clear and redraw
        sys.stdout.write("\033[H\033[J")  # Clear screen

        print("Worker Status:")
        print("-" * 40)

        for name, status in self.progress.items():
            if status["status"] == "running":
                elapsed = (datetime.now() - status["start"]).seconds
                print(f"  [{name}] Running... ({elapsed}s)")
            elif status["status"] == "done":
                print(f"  [{name}] Complete")
            else:
                print(f"  [{name}] ERROR")

        print("-" * 40)
        done = sum(1 for s in self.progress.values() if s["status"] == "done")
        total = len(self.progress)
        print(f"Progress: {done}/{total}")

11. Learning Milestones

Milestone Description Verification
1 Partition works correctly Files grouped by directory
2 Workers run in parallel See concurrent output
3 Semaphore limits concurrency Only N workers at once
4 Sessions are tracked Check checkpoint.json
5 Results aggregate correctly Sum matches individual
6 Resume works Run again, skip completed
7 Errors donโ€™t crash orchestrator One failure, others continue

12. Common Pitfalls

12.1 Not Limiting Concurrency

# WRONG: No limit, will hit rate limits
tasks = [analyze(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)

# RIGHT: Use semaphore
semaphore = asyncio.Semaphore(5)
async def limited_analyze(chunk):
    async with semaphore:
        return await analyze(chunk)
tasks = [limited_analyze(chunk) for chunk in chunks]

12.2 Losing Results on Failure

# WRONG: All results lost if one fails
results = await asyncio.gather(*tasks)  # Raises on first error

# RIGHT: Capture exceptions, keep other results
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
    if isinstance(result, Exception):
        handle_error(result)
    else:
        process_result(result)

12.3 Not Saving Checkpoints

# WRONG: Lose progress on crash
async def run_all(chunks):
    results = await asyncio.gather(*[analyze(c) for c in chunks])
    return results

# RIGHT: Save after each chunk
async def run_all(chunks):
    results = []
    for chunk in chunks:
        result = await analyze(chunk)
        results.append(result)
        save_checkpoint(results)  # Save incrementally
    return results

12.4 Unbalanced Partitioning

# WRONG: Huge directories take forever
chunks = {dir.name: files for dir in base.iterdir()}
# One chunk has 500 files, others have 10

# RIGHT: Balance chunk sizes
def balanced_partition(files, chunk_size=20):
    return [files[i:i+chunk_size] for i in range(0, len(files), chunk_size)]

13. Extension Ideas

  1. Dynamic worker scaling: Add workers when queue grows, remove when empty
  2. Priority queues: Process security-critical files first
  3. Result caching: Skip unchanged files on re-run
  4. Metrics dashboard: Real-time visualization of worker status
  5. Distributed execution: Spread across multiple machines
  6. Smart chunking: Use file dependencies to group related files

14. Summary

This project teaches you to:

  • Orchestrate multiple Claude instances in parallel
  • Manage sessions for resumable workflows
  • Apply the scatter-gather pattern
  • Handle partial failures gracefully
  • Control concurrency with semaphores

The key insight is that Claude instances are independent, so you can run many in parallel. The challenge is coordinating them: dividing work, tracking progress, handling failures, and aggregating results.

Key takeaway: Parallel processing is about isolation and coordination. Isolate workers so failures donโ€™t cascade. Coordinate through shared state (checkpoints, session IDs) that enables resume. The result is an orchestrator thatโ€™s both fast and resilient.