Project 25: Streaming JSON Pipeline - Real-Time Processing
Project 25: Streaming JSON Pipeline - Real-Time Processing
Build a real-time processing pipeline using Claudeโs streaming JSON output: process large codebases file-by-file, stream results to a dashboard, handle long-running tasks with progress updates, and aggregate results incrementally.
Quick Reference
| Attribute | Value |
|---|---|
| Difficulty | Advanced |
| Time Estimate | 2 weeks |
| Language | Python (Alternatives: TypeScript/Node, Go) |
| Prerequisites | Project 24 completed, streaming data concepts |
| Key Topics | Streaming JSON, NDJSON, subprocess, real-time processing, incremental aggregation |
| Main Book | โDesigning Data-Intensive Applicationsโ by Martin Kleppmann |
1. Learning Objectives
By completing this project, you will:
- Master streaming JSON output: Parse
--output-format stream-jsonfor real-time event processing - Build incremental pipelines: Process Claudeโs output as it arrives, not after completion
- Implement progress tracking: Display real-time progress for long-running analyses
- Handle stream errors gracefully: Recover from partial failures without losing processed data
- Aggregate results incrementally: Build running totals and summaries as events arrive
- Design responsive CLIs: Create user interfaces that update in real-time
- Understand event-driven architecture: Apply streaming patterns to AI workflows
2. Real World Outcome
When complete, youโll have a pipeline that shows real-time progress:
$ python pipeline.py --input ./src --analyze security
Analyzing 47 files for security issues...
Progress: [----------------------------] 42% (20/47)
Real-time results:
----- auth/login.ts
- Line 45: SQL injection risk
- Line 78: Hardcoded secret
----- api/users.ts
- No issues found
----- utils/crypto.ts
- Line 12: Weak hashing algorithm
...
[Live updates as Claude processes each file]
Summary:
- Files analyzed: 47
- Issues found: 12
- Critical: 3
- Warnings: 9
- Duration: 2m 34s
Streaming Output Format
Claudeโs stream-json mode emits newline-delimited JSON (NDJSON):
{"type": "start", "session_id": "abc123", "timestamp": "2024-01-15T10:30:00Z"}
{"type": "text", "content": "Analyzing auth/login.ts...\n"}
{"type": "text", "content": "Found SQL injection vulnerability at line 45"}
{"type": "tool_use", "tool_name": "Read", "tool_input": {"file_path": "src/api/users.ts"}}
{"type": "tool_result", "content": "// User API routes..."}
{"type": "text", "content": "Analyzing api/users.ts...\n"}
{"type": "result", "content": "Analysis complete", "cost": {"total": 0.0045}}
3. The Core Question Youโre Answering
โHow do I process Claudeโs output in real-time as it streams, enabling responsive pipelines for large-scale analysis?โ
Waiting for complete output is slow for large tasks. Streaming JSON lets you process results incrementally, providing real-time feedback and faster time-to-first-result.
Why This Matters
Consider analyzing a large codebase:
- Without streaming: Wait 5 minutes, then see all results at once
- With streaming: See first result in 10 seconds, watch progress in real-time
User experience improvements:
- Immediate feedback that the system is working
- Ability to cancel if results look wrong
- Progressive refinement of analysis
4. Concepts You Must Understand First
Stop and research these before coding:
4.1 Streaming JSON Format (NDJSON)
Newline-Delimited JSON (NDJSON) is a format where each line is a complete, valid JSON object:
{"event": "start", "data": "..."}
{"event": "progress", "percent": 25}
{"event": "progress", "percent": 50}
{"event": "result", "data": "..."}
Key differences from JSON arrays:
- Each line can be parsed independently
- No need to wait for closing bracket
- Stream-friendly - process as lines arrive
- Memory-efficient - no need to hold entire response
Key Questions:
- How does NDJSON differ from a JSON array?
- Why is NDJSON preferred for streaming?
- How do you handle partial lines at the end of a buffer?
Reference: ndjson.org specification
4.2 Claude Code Stream Events
When using --output-format stream-json, Claude emits these event types:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ STREAM EVENT TYPES โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ "start" โ
โ โโโ Emitted when session begins โ
โ โโโ Contains: session_id, timestamp โ
โ โโโ Always first event โ
โ โ
โ "text" โ
โ โโโ Claude's text output โ
โ โโโ Contains: content (string) โ
โ โโโ Emitted as Claude generates text โ
โ โ
โ "tool_use" โ
โ โโโ Claude is calling a tool โ
โ โโโ Contains: tool_name, tool_input โ
โ โโโ Indicates file read, shell command, etc. โ
โ โ
โ "tool_result" โ
โ โโโ Result from tool execution โ
โ โโโ Contains: content (tool output) โ
โ โโโ Follows corresponding tool_use โ
โ โ
โ "result" โ
โ โโโ Final summary โ
โ โโโ Contains: content, cost, duration โ
โ โโโ Always last event โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Reference: Claude Code documentation - โOutput Formatsโ
4.3 Stream Processing Patterns
From โDesigning Data-Intensive Applicationsโ Chapter 11:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ STREAM PROCESSING PATTERNS โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ FILTER โ
โ โโโ Pass through only events matching criteria โ
โ โโโ Example: Only "text" events containing "error" โ
โ โ
โ MAP โ
โ โโโ Transform each event โ
โ โโโ Example: Extract issue details from text โ
โ โ
โ AGGREGATE โ
โ โโโ Combine events into running totals โ
โ โโโ Example: Count issues, sum severity โ
โ โ
โ WINDOW โ
โ โโโ Group events by time or count โ
โ โโโ Example: "Last 10 events" for display โ
โ โ
โ JOIN โ
โ โโโ Correlate events from multiple streams โ
โ โโโ Example: Match tool_use with tool_result โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
5. Questions to Guide Your Design
Before implementing, think through these:
5.1 What to Stream?
| Data Type | Update Frequency | Display Strategy |
|---|---|---|
| File analysis results | Per file | Append to list |
| Progress percentage | Every event | Update in place |
| Running totals | Every issue found | Update counter |
| Error messages | Immediately | Highlight and show |
5.2 How to Display Progress?
# Option 1: Simple percentage
print(f"\rProgress: {percent}%", end="")
# Option 2: Progress bar
print(f"\r[{'=' * filled}{'.' * empty}] {percent}%", end="")
# Option 3: Rich library for fancy output
from rich.progress import Progress
with Progress() as progress:
task = progress.add_task("Analyzing...", total=file_count)
progress.update(task, advance=1)
5.3 How to Handle Errors?
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ERROR HANDLING STRATEGY โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Stream interrupted โ
โ โ โ
โ โโโ Network error โโโบ Retry from last checkpoint โ
โ โ โ
โ โโโ Claude error โโโบ Log, return partial results โ
โ โ โ
โ โโโ Parse error โโโบ Skip malformed line, continue โ
โ โ
โ Partial line at buffer end โ
โ โ โ
โ โโโ Buffer until newline, then parse โ
โ โ
โ Invalid JSON in line โ
โ โ โ
โ โโโ Log error, skip line, continue processing โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
6. Thinking Exercise
Design the Streaming Pipeline
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ STREAMING PIPELINE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Input: 47 source files โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ CLAUDE -p โ --output-format stream-json โ
โ โ (streaming) โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โ
โ โ โ
โ โ โโโโโโโโ Stream events โโโโโโโโโ โ
โ โ โ โ โ
โ โผ โผ โ โ
โ โโโโโโโโโโโโโโโโโโโ โ โ
โ โ LINE PARSER โ Read NDJSON lines โ โ
โ โ json.loads() โ one at a time โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค โ
โ โ โ โ
โ โผ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ DISPLAY HANDLER โ โ AGGREGATE HANDLERโ โ
โ โ - Progress bar โ โ - Issue counts โ โ
โ โ - File results โ โ - Running totals โ โ
โ โ - Error alerts โ โ - Statistics โ โ
โ โโโโโโโโโโฌโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโ โ
โ โ โ โ
โ โโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโ โ
โ โ โ
โ โผ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ FINAL REPORT โ Complete summary โ
โ โ - Total issues โ โ
โ โ - By severity โ โ
โ โ - Duration โ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Design Questions
- Whatโs in each streaming event?
- Parse the
typefield to determine handler - Extract relevant data based on event type
- Correlate tool_use with tool_result
- Parse the
- How do you know when a file is done?
- Track โAnalyzing Xโฆโ patterns in text
- Watch for tool_use on next file
- Infer from file path changes
- How do you handle stream interruption?
- Save processed results incrementally
- Track last successfully processed file
- Enable resume from checkpoint
7. The Interview Questions Theyโll Ask
7.1 โWhatโs streaming JSON and when would you use it?โ
Good answer: Streaming JSON (typically NDJSON) is a format where each line is a complete JSON object. Unlike JSON arrays, you donโt need to wait for the entire response to start processing. Use it when:
- Processing large datasets that donโt fit in memory
- Need real-time feedback (progress bars, live results)
- Building responsive user interfaces
- Handling long-running operations
7.2 โHow do you process data incrementally as it arrives?โ
Key techniques:
- Read stdout line-by-line using
subprocess.PIPE - Parse each line independently with
json.loads() - Update aggregates with each new data point
- Use async/await for non-blocking I/O
7.3 โWhat are the challenges of stream-based error handling?โ
Challenges and solutions:
- Partial lines: Buffer until newline, then parse
- Malformed JSON: Log and skip, donโt crash
- Stream interruption: Save partial results, enable resume
- Out-of-order events: Use sequence numbers or timestamps
7.4 โHow do you build responsive UIs with streaming backends?โ
Approaches:
- Update display on each event (not just at end)
- Use carriage return (
\r) for in-place updates - Buffer updates if they arrive faster than render rate
- Separate parse thread from display thread if needed
7.5 โWhatโs NDJSON and how does it differ from JSON arrays?โ
Comparison:
| Aspect | JSON Array | NDJSON |
|---|---|---|
| Format | [{...}, {...}] |
{...}\n{...}\n |
| Parsing | Wait for entire array | Parse line by line |
| Memory | Hold entire array | Process one event |
| Streaming | Not supported | Native |
| Schema | Uniform (array of objects) | Each line independent |
8. Hints in Layers
Hint 1: Use subprocess.PIPE
Start with the simplest streaming approach:
import subprocess
proc = subprocess.Popen(
["claude", "-p", "Analyze code", "--output-format", "stream-json"],
stdout=subprocess.PIPE,
text=True
)
for line in proc.stdout:
print(f"Got line: {line[:50]}...")
Hint 2: Parse Each Line as JSON
Each line is a complete JSON object:
import json
for line in proc.stdout:
if not line.strip():
continue
event = json.loads(line)
print(f"Event type: {event.get('type')}")
Hint 3: Watch for Event Types
Handle different events appropriately:
for line in proc.stdout:
if not line.strip():
continue
event = json.loads(line)
match event.get("type"):
case "start":
print(f"Session started: {event['session_id']}")
case "text":
process_text(event["content"])
case "tool_use":
print(f"Using tool: {event['tool_name']}")
case "result":
print(f"Complete! Cost: {event['cost']}")
Hint 4: Track State Incrementally
Maintain running totals:
class StreamProcessor:
def __init__(self):
self.files_processed = 0
self.issues = []
self.current_file = None
def process(self, event):
if event["type"] == "text":
content = event["content"]
if content.startswith("Analyzing "):
self.current_file = content.split()[1]
self.files_processed += 1
elif "issue" in content.lower():
self.issues.append({
"file": self.current_file,
"description": content
})
self.update_display()
def update_display(self):
print(f"\rFiles: {self.files_processed} | Issues: {len(self.issues)}", end="")
9. Books That Will Help
| Topic | Book | Chapter/Section |
|---|---|---|
| Stream processing | โDesigning Data-Intensive Applicationsโ by Kleppmann | Ch. 11: Stream Processing |
| Real-time systems | โStreaming Systemsโ by Akidau, Chernyak, Lax | Ch. 1-3: Fundamentals |
| Python async | โFluent Pythonโ by Luciano Ramalho | Ch. 21: Concurrency with Futures |
| Python async | โFluent Pythonโ by Luciano Ramalho | Ch. 22: Asynchronous Programming |
| Event-driven | โReactive Design Patternsโ by Kuhn | Ch. 2-4: Event patterns |
10. Implementation Guide
10.1 Basic Streaming Pipeline
import subprocess
import json
import sys
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AnalysisState:
files_processed: int = 0
total_files: int = 0
issues: list = field(default_factory=list)
current_file: Optional[str] = None
session_id: Optional[str] = None
def run_streaming_analysis(files: list[str]) -> AnalysisState:
"""Run Claude analysis with streaming output."""
prompt = f"""Analyze these files for security issues.
Report each file as you analyze it.
Files: {' '.join(files)}"""
proc = subprocess.Popen(
["claude", "-p", prompt, "--output-format", "stream-json"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
state = AnalysisState(total_files=len(files))
try:
for line in proc.stdout:
if not line.strip():
continue
try:
event = json.loads(line)
process_event(event, state)
except json.JSONDecodeError as e:
print(f"\nWarning: Could not parse: {line[:50]}...", file=sys.stderr)
except KeyboardInterrupt:
print("\n\nInterrupted by user. Partial results:")
proc.terminate()
proc.wait()
return state
def process_event(event: dict, state: AnalysisState):
"""Process a single streaming event."""
event_type = event.get("type")
if event_type == "start":
state.session_id = event.get("session_id")
print(f"Session started: {state.session_id}")
elif event_type == "text":
content = event.get("content", "")
handle_text(content, state)
elif event_type == "tool_use":
tool = event.get("tool_name")
if tool == "Read":
file_path = event.get("tool_input", {}).get("file_path")
if file_path:
state.current_file = file_path
state.files_processed += 1
update_progress(state)
elif event_type == "result":
cost = event.get("cost", {})
print(f"\n\nAnalysis complete!")
print(f"Total cost: ${cost.get('total_cost', 0):.4f}")
def handle_text(content: str, state: AnalysisState):
"""Handle text output, extracting issues."""
# Look for issue patterns
issue_keywords = ["vulnerability", "issue", "warning", "error", "risk"]
if any(kw in content.lower() for kw in issue_keywords):
state.issues.append({
"file": state.current_file,
"description": content.strip()
})
print(f"\n Issue: {content.strip()[:60]}...")
def update_progress(state: AnalysisState):
"""Update the progress display."""
percent = (state.files_processed / state.total_files * 100) if state.total_files else 0
bar_width = 30
filled = int(bar_width * percent / 100)
bar = "=" * filled + "-" * (bar_width - filled)
print(f"\r[{bar}] {percent:.0f}% ({state.files_processed}/{state.total_files})", end="")
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="Directory to analyze")
args = parser.parse_args()
# Find files
from pathlib import Path
files = list(Path(args.input).glob("**/*.py"))
print(f"Found {len(files)} Python files to analyze\n")
# Run analysis
state = run_streaming_analysis([str(f) for f in files])
# Print summary
print("\n" + "=" * 50)
print("SUMMARY")
print("=" * 50)
print(f"Files analyzed: {state.files_processed}")
print(f"Issues found: {len(state.issues)}")
for issue in state.issues:
print(f" - {issue['file']}: {issue['description'][:50]}...")
if __name__ == "__main__":
main()
10.2 Async Version for Better Responsiveness
import asyncio
import json
async def run_async_analysis(files: list[str]):
"""Async version for better responsiveness."""
prompt = f"Analyze: {' '.join(files)}"
proc = await asyncio.create_subprocess_exec(
"claude", "-p", prompt, "--output-format", "stream-json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
state = {"files": 0, "issues": []}
async for line in proc.stdout:
line = line.decode().strip()
if not line:
continue
try:
event = json.loads(line)
await process_event_async(event, state)
except json.JSONDecodeError:
pass
await proc.wait()
return state
async def process_event_async(event: dict, state: dict):
"""Async event processing."""
if event.get("type") == "text":
content = event.get("content", "")
print(content, end="", flush=True)
if "issue" in content.lower():
state["issues"].append(content)
# Run with: asyncio.run(run_async_analysis(files))
10.3 Rich Terminal Output
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.live import Live
from rich.table import Table
console = Console()
def run_with_rich_output(files: list[str]):
"""Use Rich library for beautiful terminal output."""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=True,
) as progress:
task = progress.add_task("Analyzing...", total=len(files))
# ... run analysis, update progress ...
for result in stream_results():
progress.update(task, advance=1)
if result.has_issue:
console.print(f"[red]Issue:[/red] {result.description}")
11. Learning Milestones
| Milestone | Description | Verification |
|---|---|---|
| 1 | Stream events arrive in real-time | See events as they occur, not at end |
| 2 | Parse NDJSON correctly | Each line parsed independently |
| 3 | Progress updates during processing | Bar updates as files complete |
| 4 | Handle stream interruption | Ctrl+C gives partial results |
| 5 | Aggregate correctly | Final counts match event counts |
| 6 | Error recovery works | Malformed JSON doesnโt crash |
12. Common Pitfalls
12.1 Blocking on Full Output
# WRONG: Waits for complete output
result = subprocess.run(["claude", "-p", "...", "--output-format", "stream-json"],
capture_output=True, text=True)
# By the time you get result.stdout, it's all there
# RIGHT: Stream as it arrives
proc = subprocess.Popen(["claude", "-p", "...", "--output-format", "stream-json"],
stdout=subprocess.PIPE, text=True)
for line in proc.stdout: # Yields lines as they arrive
process(line)
12.2 Not Flushing Output
# WRONG: Progress updates may buffer
print(f"\rProgress: {percent}%", end="")
# RIGHT: Flush to ensure immediate display
print(f"\rProgress: {percent}%", end="", flush=True)
# Or use sys.stdout directly
sys.stdout.write(f"\rProgress: {percent}%")
sys.stdout.flush()
12.3 Ignoring Parse Errors
# WRONG: Crash on malformed JSON
for line in proc.stdout:
event = json.loads(line) # May raise JSONDecodeError
# RIGHT: Handle gracefully
for line in proc.stdout:
try:
event = json.loads(line)
process(event)
except json.JSONDecodeError as e:
logging.warning(f"Skipped malformed line: {e}")
continue
12.4 Not Handling Partial Lines
# WRONG: Assume complete lines
for line in proc.stdout:
event = json.loads(line) # Last line might be incomplete
# RIGHT: Check for complete JSON
for line in proc.stdout:
line = line.strip()
if not line:
continue
if not (line.startswith("{") and line.endswith("}")):
buffer += line # Buffer partial line
continue
event = json.loads(line)
13. Extension Ideas
- WebSocket dashboard: Stream results to a web UI in real-time
- Multi-file correlation: Track issues across related files
- Severity aggregation: Weight issues by severity for summary
- Resume capability: Save checkpoints, resume interrupted analysis
- Parallel streams: Process multiple files with multiple Claude instances
- Metric recording: Track performance over time for optimization
14. Summary
This project teaches you to:
- Use
--output-format stream-jsonfor real-time Claude output - Parse NDJSON (newline-delimited JSON) incrementally
- Build responsive CLIs with live progress updates
- Handle streaming errors gracefully
- Aggregate results incrementally
The key insight is that streaming transforms Claude from a batch processor into an event source. Instead of waiting for complete results, you can react to each event as it arrives - enabling better user experience, faster feedback, and more resilient processing.
Key takeaway: Stream processing is about events, not files. Each JSON line is an event that tells you something about Claudeโs progress. Design your system around these events, and youโll build responsive, robust pipelines that handle large-scale analysis gracefully.