Project 25: Streaming JSON Pipeline - Real-Time Processing

Project 25: Streaming JSON Pipeline - Real-Time Processing

Build a real-time processing pipeline using Claudeโ€™s streaming JSON output: process large codebases file-by-file, stream results to a dashboard, handle long-running tasks with progress updates, and aggregate results incrementally.

Quick Reference

Attribute Value
Difficulty Advanced
Time Estimate 2 weeks
Language Python (Alternatives: TypeScript/Node, Go)
Prerequisites Project 24 completed, streaming data concepts
Key Topics Streaming JSON, NDJSON, subprocess, real-time processing, incremental aggregation
Main Book โ€œDesigning Data-Intensive Applicationsโ€ by Martin Kleppmann

1. Learning Objectives

By completing this project, you will:

  1. Master streaming JSON output: Parse --output-format stream-json for real-time event processing
  2. Build incremental pipelines: Process Claudeโ€™s output as it arrives, not after completion
  3. Implement progress tracking: Display real-time progress for long-running analyses
  4. Handle stream errors gracefully: Recover from partial failures without losing processed data
  5. Aggregate results incrementally: Build running totals and summaries as events arrive
  6. Design responsive CLIs: Create user interfaces that update in real-time
  7. Understand event-driven architecture: Apply streaming patterns to AI workflows

2. Real World Outcome

When complete, youโ€™ll have a pipeline that shows real-time progress:

$ python pipeline.py --input ./src --analyze security

Analyzing 47 files for security issues...

Progress: [----------------------------] 42% (20/47)

Real-time results:
----- auth/login.ts
     - Line 45: SQL injection risk
     - Line 78: Hardcoded secret
----- api/users.ts
     - No issues found
----- utils/crypto.ts
     - Line 12: Weak hashing algorithm
...

[Live updates as Claude processes each file]

Summary:
- Files analyzed: 47
- Issues found: 12
- Critical: 3
- Warnings: 9
- Duration: 2m 34s

Streaming Output Format

Claudeโ€™s stream-json mode emits newline-delimited JSON (NDJSON):

{"type": "start", "session_id": "abc123", "timestamp": "2024-01-15T10:30:00Z"}
{"type": "text", "content": "Analyzing auth/login.ts...\n"}
{"type": "text", "content": "Found SQL injection vulnerability at line 45"}
{"type": "tool_use", "tool_name": "Read", "tool_input": {"file_path": "src/api/users.ts"}}
{"type": "tool_result", "content": "// User API routes..."}
{"type": "text", "content": "Analyzing api/users.ts...\n"}
{"type": "result", "content": "Analysis complete", "cost": {"total": 0.0045}}

3. The Core Question Youโ€™re Answering

โ€œHow do I process Claudeโ€™s output in real-time as it streams, enabling responsive pipelines for large-scale analysis?โ€

Waiting for complete output is slow for large tasks. Streaming JSON lets you process results incrementally, providing real-time feedback and faster time-to-first-result.

Why This Matters

Consider analyzing a large codebase:

  • Without streaming: Wait 5 minutes, then see all results at once
  • With streaming: See first result in 10 seconds, watch progress in real-time

User experience improvements:

  • Immediate feedback that the system is working
  • Ability to cancel if results look wrong
  • Progressive refinement of analysis

4. Concepts You Must Understand First

Stop and research these before coding:

4.1 Streaming JSON Format (NDJSON)

Newline-Delimited JSON (NDJSON) is a format where each line is a complete, valid JSON object:

{"event": "start", "data": "..."}
{"event": "progress", "percent": 25}
{"event": "progress", "percent": 50}
{"event": "result", "data": "..."}

Key differences from JSON arrays:

  • Each line can be parsed independently
  • No need to wait for closing bracket
  • Stream-friendly - process as lines arrive
  • Memory-efficient - no need to hold entire response

Key Questions:

  • How does NDJSON differ from a JSON array?
  • Why is NDJSON preferred for streaming?
  • How do you handle partial lines at the end of a buffer?

Reference: ndjson.org specification

4.2 Claude Code Stream Events

When using --output-format stream-json, Claude emits these event types:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    STREAM EVENT TYPES                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  "start"                                                         โ”‚
โ”‚  โ”œโ”€โ”€ Emitted when session begins                                โ”‚
โ”‚  โ”œโ”€โ”€ Contains: session_id, timestamp                            โ”‚
โ”‚  โ””โ”€โ”€ Always first event                                         โ”‚
โ”‚                                                                  โ”‚
โ”‚  "text"                                                          โ”‚
โ”‚  โ”œโ”€โ”€ Claude's text output                                        โ”‚
โ”‚  โ”œโ”€โ”€ Contains: content (string)                                  โ”‚
โ”‚  โ””โ”€โ”€ Emitted as Claude generates text                           โ”‚
โ”‚                                                                  โ”‚
โ”‚  "tool_use"                                                      โ”‚
โ”‚  โ”œโ”€โ”€ Claude is calling a tool                                    โ”‚
โ”‚  โ”œโ”€โ”€ Contains: tool_name, tool_input                            โ”‚
โ”‚  โ””โ”€โ”€ Indicates file read, shell command, etc.                   โ”‚
โ”‚                                                                  โ”‚
โ”‚  "tool_result"                                                   โ”‚
โ”‚  โ”œโ”€โ”€ Result from tool execution                                  โ”‚
โ”‚  โ”œโ”€โ”€ Contains: content (tool output)                            โ”‚
โ”‚  โ””โ”€โ”€ Follows corresponding tool_use                             โ”‚
โ”‚                                                                  โ”‚
โ”‚  "result"                                                        โ”‚
โ”‚  โ”œโ”€โ”€ Final summary                                               โ”‚
โ”‚  โ”œโ”€โ”€ Contains: content, cost, duration                          โ”‚
โ”‚  โ””โ”€โ”€ Always last event                                          โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Reference: Claude Code documentation - โ€œOutput Formatsโ€

4.3 Stream Processing Patterns

From โ€œDesigning Data-Intensive Applicationsโ€ Chapter 11:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  STREAM PROCESSING PATTERNS                      โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  FILTER                                                          โ”‚
โ”‚  โ”œโ”€โ”€ Pass through only events matching criteria                  โ”‚
โ”‚  โ””โ”€โ”€ Example: Only "text" events containing "error"              โ”‚
โ”‚                                                                  โ”‚
โ”‚  MAP                                                             โ”‚
โ”‚  โ”œโ”€โ”€ Transform each event                                        โ”‚
โ”‚  โ””โ”€โ”€ Example: Extract issue details from text                    โ”‚
โ”‚                                                                  โ”‚
โ”‚  AGGREGATE                                                       โ”‚
โ”‚  โ”œโ”€โ”€ Combine events into running totals                          โ”‚
โ”‚  โ””โ”€โ”€ Example: Count issues, sum severity                         โ”‚
โ”‚                                                                  โ”‚
โ”‚  WINDOW                                                          โ”‚
โ”‚  โ”œโ”€โ”€ Group events by time or count                               โ”‚
โ”‚  โ””โ”€โ”€ Example: "Last 10 events" for display                      โ”‚
โ”‚                                                                  โ”‚
โ”‚  JOIN                                                            โ”‚
โ”‚  โ”œโ”€โ”€ Correlate events from multiple streams                      โ”‚
โ”‚  โ””โ”€โ”€ Example: Match tool_use with tool_result                    โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

5. Questions to Guide Your Design

Before implementing, think through these:

5.1 What to Stream?

Data Type Update Frequency Display Strategy
File analysis results Per file Append to list
Progress percentage Every event Update in place
Running totals Every issue found Update counter
Error messages Immediately Highlight and show

5.2 How to Display Progress?

# Option 1: Simple percentage
print(f"\rProgress: {percent}%", end="")

# Option 2: Progress bar
print(f"\r[{'=' * filled}{'.' * empty}] {percent}%", end="")

# Option 3: Rich library for fancy output
from rich.progress import Progress
with Progress() as progress:
    task = progress.add_task("Analyzing...", total=file_count)
    progress.update(task, advance=1)

5.3 How to Handle Errors?

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    ERROR HANDLING STRATEGY                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Stream interrupted                                              โ”‚
โ”‚       โ”‚                                                          โ”‚
โ”‚       โ”œโ”€โ”€ Network error โ”€โ”€โ–บ Retry from last checkpoint          โ”‚
โ”‚       โ”‚                                                          โ”‚
โ”‚       โ”œโ”€โ”€ Claude error โ”€โ”€โ–บ Log, return partial results           โ”‚
โ”‚       โ”‚                                                          โ”‚
โ”‚       โ””โ”€โ”€ Parse error โ”€โ”€โ–บ Skip malformed line, continue          โ”‚
โ”‚                                                                  โ”‚
โ”‚  Partial line at buffer end                                      โ”‚
โ”‚       โ”‚                                                          โ”‚
โ”‚       โ””โ”€โ”€ Buffer until newline, then parse                       โ”‚
โ”‚                                                                  โ”‚
โ”‚  Invalid JSON in line                                            โ”‚
โ”‚       โ”‚                                                          โ”‚
โ”‚       โ””โ”€โ”€ Log error, skip line, continue processing              โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

6. Thinking Exercise

Design the Streaming Pipeline

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    STREAMING PIPELINE                            โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Input: 47 source files                                          โ”‚
โ”‚       โ”‚                                                          โ”‚
โ”‚       โ–ผ                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                                            โ”‚
โ”‚  โ”‚ CLAUDE -p       โ”‚ --output-format stream-json                โ”‚
โ”‚  โ”‚ (streaming)     โ”‚                                            โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                                            โ”‚
โ”‚           โ”‚                                                      โ”‚
โ”‚           โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€ Stream events โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                    โ”‚
โ”‚           โ”‚ โ”‚                              โ”‚                    โ”‚
โ”‚           โ–ผ โ–ผ                              โ”‚                    โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                       โ”‚                    โ”‚
โ”‚  โ”‚ LINE PARSER     โ”‚ Read NDJSON lines     โ”‚                    โ”‚
โ”‚  โ”‚ json.loads()    โ”‚ one at a time         โ”‚                    โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                       โ”‚                    โ”‚
โ”‚           โ”‚                                โ”‚                    โ”‚
โ”‚           โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค                    โ”‚
โ”‚           โ”‚                                โ”‚                    โ”‚
โ”‚           โ–ผ                                โ–ผ                    โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”            โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”            โ”‚
โ”‚  โ”‚ DISPLAY HANDLER โ”‚            โ”‚ AGGREGATE HANDLERโ”‚            โ”‚
โ”‚  โ”‚ - Progress bar  โ”‚            โ”‚ - Issue counts   โ”‚            โ”‚
โ”‚  โ”‚ - File results  โ”‚            โ”‚ - Running totals โ”‚            โ”‚
โ”‚  โ”‚ - Error alerts  โ”‚            โ”‚ - Statistics     โ”‚            โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜            โ”‚
โ”‚           โ”‚                              โ”‚                      โ”‚
โ”‚           โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                      โ”‚
โ”‚                          โ”‚                                      โ”‚
โ”‚                          โ–ผ                                      โ”‚
โ”‚               โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                               โ”‚
โ”‚               โ”‚ FINAL REPORT    โ”‚ Complete summary              โ”‚
โ”‚               โ”‚ - Total issues  โ”‚                               โ”‚
โ”‚               โ”‚ - By severity   โ”‚                               โ”‚
โ”‚               โ”‚ - Duration      โ”‚                               โ”‚
โ”‚               โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                               โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Design Questions

  1. Whatโ€™s in each streaming event?
    • Parse the type field to determine handler
    • Extract relevant data based on event type
    • Correlate tool_use with tool_result
  2. How do you know when a file is done?
    • Track โ€œAnalyzing Xโ€ฆโ€ patterns in text
    • Watch for tool_use on next file
    • Infer from file path changes
  3. How do you handle stream interruption?
    • Save processed results incrementally
    • Track last successfully processed file
    • Enable resume from checkpoint

7. The Interview Questions Theyโ€™ll Ask

7.1 โ€œWhatโ€™s streaming JSON and when would you use it?โ€

Good answer: Streaming JSON (typically NDJSON) is a format where each line is a complete JSON object. Unlike JSON arrays, you donโ€™t need to wait for the entire response to start processing. Use it when:

  • Processing large datasets that donโ€™t fit in memory
  • Need real-time feedback (progress bars, live results)
  • Building responsive user interfaces
  • Handling long-running operations

7.2 โ€œHow do you process data incrementally as it arrives?โ€

Key techniques:

  • Read stdout line-by-line using subprocess.PIPE
  • Parse each line independently with json.loads()
  • Update aggregates with each new data point
  • Use async/await for non-blocking I/O

7.3 โ€œWhat are the challenges of stream-based error handling?โ€

Challenges and solutions:

  • Partial lines: Buffer until newline, then parse
  • Malformed JSON: Log and skip, donโ€™t crash
  • Stream interruption: Save partial results, enable resume
  • Out-of-order events: Use sequence numbers or timestamps

7.4 โ€œHow do you build responsive UIs with streaming backends?โ€

Approaches:

  • Update display on each event (not just at end)
  • Use carriage return (\r) for in-place updates
  • Buffer updates if they arrive faster than render rate
  • Separate parse thread from display thread if needed

7.5 โ€œWhatโ€™s NDJSON and how does it differ from JSON arrays?โ€

Comparison:

Aspect JSON Array NDJSON
Format [{...}, {...}] {...}\n{...}\n
Parsing Wait for entire array Parse line by line
Memory Hold entire array Process one event
Streaming Not supported Native
Schema Uniform (array of objects) Each line independent

8. Hints in Layers

Hint 1: Use subprocess.PIPE

Start with the simplest streaming approach:

import subprocess

proc = subprocess.Popen(
    ["claude", "-p", "Analyze code", "--output-format", "stream-json"],
    stdout=subprocess.PIPE,
    text=True
)

for line in proc.stdout:
    print(f"Got line: {line[:50]}...")

Hint 2: Parse Each Line as JSON

Each line is a complete JSON object:

import json

for line in proc.stdout:
    if not line.strip():
        continue
    event = json.loads(line)
    print(f"Event type: {event.get('type')}")

Hint 3: Watch for Event Types

Handle different events appropriately:

for line in proc.stdout:
    if not line.strip():
        continue
    event = json.loads(line)

    match event.get("type"):
        case "start":
            print(f"Session started: {event['session_id']}")
        case "text":
            process_text(event["content"])
        case "tool_use":
            print(f"Using tool: {event['tool_name']}")
        case "result":
            print(f"Complete! Cost: {event['cost']}")

Hint 4: Track State Incrementally

Maintain running totals:

class StreamProcessor:
    def __init__(self):
        self.files_processed = 0
        self.issues = []
        self.current_file = None

    def process(self, event):
        if event["type"] == "text":
            content = event["content"]
            if content.startswith("Analyzing "):
                self.current_file = content.split()[1]
                self.files_processed += 1
            elif "issue" in content.lower():
                self.issues.append({
                    "file": self.current_file,
                    "description": content
                })

        self.update_display()

    def update_display(self):
        print(f"\rFiles: {self.files_processed} | Issues: {len(self.issues)}", end="")

9. Books That Will Help

Topic Book Chapter/Section
Stream processing โ€œDesigning Data-Intensive Applicationsโ€ by Kleppmann Ch. 11: Stream Processing
Real-time systems โ€œStreaming Systemsโ€ by Akidau, Chernyak, Lax Ch. 1-3: Fundamentals
Python async โ€œFluent Pythonโ€ by Luciano Ramalho Ch. 21: Concurrency with Futures
Python async โ€œFluent Pythonโ€ by Luciano Ramalho Ch. 22: Asynchronous Programming
Event-driven โ€œReactive Design Patternsโ€ by Kuhn Ch. 2-4: Event patterns

10. Implementation Guide

10.1 Basic Streaming Pipeline

import subprocess
import json
import sys
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class AnalysisState:
    files_processed: int = 0
    total_files: int = 0
    issues: list = field(default_factory=list)
    current_file: Optional[str] = None
    session_id: Optional[str] = None

def run_streaming_analysis(files: list[str]) -> AnalysisState:
    """Run Claude analysis with streaming output."""
    prompt = f"""Analyze these files for security issues.
    Report each file as you analyze it.
    Files: {' '.join(files)}"""

    proc = subprocess.Popen(
        ["claude", "-p", prompt, "--output-format", "stream-json"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )

    state = AnalysisState(total_files=len(files))

    try:
        for line in proc.stdout:
            if not line.strip():
                continue

            try:
                event = json.loads(line)
                process_event(event, state)
            except json.JSONDecodeError as e:
                print(f"\nWarning: Could not parse: {line[:50]}...", file=sys.stderr)

    except KeyboardInterrupt:
        print("\n\nInterrupted by user. Partial results:")
        proc.terminate()

    proc.wait()
    return state

def process_event(event: dict, state: AnalysisState):
    """Process a single streaming event."""
    event_type = event.get("type")

    if event_type == "start":
        state.session_id = event.get("session_id")
        print(f"Session started: {state.session_id}")

    elif event_type == "text":
        content = event.get("content", "")
        handle_text(content, state)

    elif event_type == "tool_use":
        tool = event.get("tool_name")
        if tool == "Read":
            file_path = event.get("tool_input", {}).get("file_path")
            if file_path:
                state.current_file = file_path
                state.files_processed += 1
                update_progress(state)

    elif event_type == "result":
        cost = event.get("cost", {})
        print(f"\n\nAnalysis complete!")
        print(f"Total cost: ${cost.get('total_cost', 0):.4f}")

def handle_text(content: str, state: AnalysisState):
    """Handle text output, extracting issues."""
    # Look for issue patterns
    issue_keywords = ["vulnerability", "issue", "warning", "error", "risk"]
    if any(kw in content.lower() for kw in issue_keywords):
        state.issues.append({
            "file": state.current_file,
            "description": content.strip()
        })
        print(f"\n  Issue: {content.strip()[:60]}...")

def update_progress(state: AnalysisState):
    """Update the progress display."""
    percent = (state.files_processed / state.total_files * 100) if state.total_files else 0
    bar_width = 30
    filled = int(bar_width * percent / 100)
    bar = "=" * filled + "-" * (bar_width - filled)
    print(f"\r[{bar}] {percent:.0f}% ({state.files_processed}/{state.total_files})", end="")

def main():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True, help="Directory to analyze")
    args = parser.parse_args()

    # Find files
    from pathlib import Path
    files = list(Path(args.input).glob("**/*.py"))
    print(f"Found {len(files)} Python files to analyze\n")

    # Run analysis
    state = run_streaming_analysis([str(f) for f in files])

    # Print summary
    print("\n" + "=" * 50)
    print("SUMMARY")
    print("=" * 50)
    print(f"Files analyzed: {state.files_processed}")
    print(f"Issues found: {len(state.issues)}")
    for issue in state.issues:
        print(f"  - {issue['file']}: {issue['description'][:50]}...")

if __name__ == "__main__":
    main()

10.2 Async Version for Better Responsiveness

import asyncio
import json

async def run_async_analysis(files: list[str]):
    """Async version for better responsiveness."""
    prompt = f"Analyze: {' '.join(files)}"

    proc = await asyncio.create_subprocess_exec(
        "claude", "-p", prompt, "--output-format", "stream-json",
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    state = {"files": 0, "issues": []}

    async for line in proc.stdout:
        line = line.decode().strip()
        if not line:
            continue

        try:
            event = json.loads(line)
            await process_event_async(event, state)
        except json.JSONDecodeError:
            pass

    await proc.wait()
    return state

async def process_event_async(event: dict, state: dict):
    """Async event processing."""
    if event.get("type") == "text":
        content = event.get("content", "")
        print(content, end="", flush=True)

        if "issue" in content.lower():
            state["issues"].append(content)

# Run with: asyncio.run(run_async_analysis(files))

10.3 Rich Terminal Output

from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.live import Live
from rich.table import Table

console = Console()

def run_with_rich_output(files: list[str]):
    """Use Rich library for beautiful terminal output."""
    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        transient=True,
    ) as progress:
        task = progress.add_task("Analyzing...", total=len(files))

        # ... run analysis, update progress ...
        for result in stream_results():
            progress.update(task, advance=1)
            if result.has_issue:
                console.print(f"[red]Issue:[/red] {result.description}")

11. Learning Milestones

Milestone Description Verification
1 Stream events arrive in real-time See events as they occur, not at end
2 Parse NDJSON correctly Each line parsed independently
3 Progress updates during processing Bar updates as files complete
4 Handle stream interruption Ctrl+C gives partial results
5 Aggregate correctly Final counts match event counts
6 Error recovery works Malformed JSON doesnโ€™t crash

12. Common Pitfalls

12.1 Blocking on Full Output

# WRONG: Waits for complete output
result = subprocess.run(["claude", "-p", "...", "--output-format", "stream-json"],
                       capture_output=True, text=True)
# By the time you get result.stdout, it's all there

# RIGHT: Stream as it arrives
proc = subprocess.Popen(["claude", "-p", "...", "--output-format", "stream-json"],
                        stdout=subprocess.PIPE, text=True)
for line in proc.stdout:  # Yields lines as they arrive
    process(line)

12.2 Not Flushing Output

# WRONG: Progress updates may buffer
print(f"\rProgress: {percent}%", end="")

# RIGHT: Flush to ensure immediate display
print(f"\rProgress: {percent}%", end="", flush=True)

# Or use sys.stdout directly
sys.stdout.write(f"\rProgress: {percent}%")
sys.stdout.flush()

12.3 Ignoring Parse Errors

# WRONG: Crash on malformed JSON
for line in proc.stdout:
    event = json.loads(line)  # May raise JSONDecodeError

# RIGHT: Handle gracefully
for line in proc.stdout:
    try:
        event = json.loads(line)
        process(event)
    except json.JSONDecodeError as e:
        logging.warning(f"Skipped malformed line: {e}")
        continue

12.4 Not Handling Partial Lines

# WRONG: Assume complete lines
for line in proc.stdout:
    event = json.loads(line)  # Last line might be incomplete

# RIGHT: Check for complete JSON
for line in proc.stdout:
    line = line.strip()
    if not line:
        continue
    if not (line.startswith("{") and line.endswith("}")):
        buffer += line  # Buffer partial line
        continue
    event = json.loads(line)

13. Extension Ideas

  1. WebSocket dashboard: Stream results to a web UI in real-time
  2. Multi-file correlation: Track issues across related files
  3. Severity aggregation: Weight issues by severity for summary
  4. Resume capability: Save checkpoints, resume interrupted analysis
  5. Parallel streams: Process multiple files with multiple Claude instances
  6. Metric recording: Track performance over time for optimization

14. Summary

This project teaches you to:

  • Use --output-format stream-json for real-time Claude output
  • Parse NDJSON (newline-delimited JSON) incrementally
  • Build responsive CLIs with live progress updates
  • Handle streaming errors gracefully
  • Aggregate results incrementally

The key insight is that streaming transforms Claude from a batch processor into an event source. Instead of waiting for complete results, you can react to each event as it arrives - enabling better user experience, faster feedback, and more resilient processing.

Key takeaway: Stream processing is about events, not files. Each JSON line is an event that tells you something about Claudeโ€™s progress. Design your system around these events, and youโ€™ll build responsive, robust pipelines that handle large-scale analysis gracefully.