Ingest Data from a REST API
Goal: Build a reliable ingestion pipeline that pulls data from a REST API, handles pagination and rate limits, and stores raw and curated data. You will learn incremental ingestion, schema validation, and operational safeguards. By the end, you can prove your ingestion is complete, accurate, and traceable.
Context and Problem
- Real-world scenario: A company needs to ingest customer data from a third-party CRM API.
- Stakeholders and constraints: Sales needs up-to-date customer data; Engineering needs a stable pipeline. Constraints include rate limits, schema changes, and authentication.
- What happens if this system fails? Customer data becomes stale and downstream analytics drift.
Real World Outcome
- You run an ingestion script that writes raw JSON and a staging table.
- Example CLI transcript:
$ python ingest_crm.py --since 2024-05-01 --out data/raw/crm_2024-05-01.jsonl
Fetched pages=24 records=11874
Stored raw file=data/raw/crm_2024-05-01.jsonl
Loaded staging rows=11874
- Example raw JSONL record:
{"id":"cust_001","email":"aline@example.com","status":"active","created_at":"2023-02-14T09:22:10Z"}
Core Concepts
- Pagination and rate limits: Reliable page-by-page fetching.
- Incremental loads: Using updated_at or cursor for delta ingestion.
- Schema validation: Ensuring required fields exist and types are correct.
- Data quality dimensions: Accuracy, completeness, consistency, timeliness, traceability.
Architecture
+------------------+ +-------------------+ +---------------------+
| CRM REST API | --> | Fetcher + Validator| --> | Raw JSONL + Staging |
+------------------+ +-------------------+ +---------------------+
| |
+----> retry/backoff +----> rejected_records.jsonl
Data Model
- Raw storage: JSONL file per run
- Staging table: stg_crm_customers
- id TEXT
- email TEXT
- status TEXT
- created_at TEXT
- updated_at TEXT
- ingestion_run_id TEXT
- Example record:
id=cust_001 email=aline@example.com status=active created_at=2023-02-14T09:22:10Z updated_at=2024-05-02T11:00:00Z ingestion_run_id=20240502_0001
Implementation Plan
- Implement API client with auth (API key or OAuth).
- Fetch data using pagination; respect rate limits with backoff.
- Store raw responses in JSONL for traceability.
- Validate schema and load into staging table.
- Track ingestion runs with a run_id and row counts.
- Support incremental loads via
--sinceparameter.
Validation and Data Quality
- Accuracy: Validate field types and required fields (id, email).
- Completeness: Count of records equals API-reported total.
- Consistency: No duplicate ids within the same run.
- Timeliness: latest updated_at <= now and >= –since.
- Traceability: store run_id and raw JSON line hash.
Example SQL checks:
SELECT COUNT(*) FROM stg_crm_customers WHERE id IS NULL OR email IS NULL;
SELECT COUNT(DISTINCT id) FROM stg_crm_customers;
Failure Modes and Debugging
- Rate limit exceeded: API returns 429.
- Symptom: partial ingestion. Fix by exponential backoff and retry with jitter.
- Schema change: new field appears or field type changes.
- Symptom: validation failures. Fix by versioning schema and adding flexible parsing.
- Partial page failure: network error mid-run.
- Symptom: missing records. Fix by resume using page cursor and idempotent writes.
Definition of Done
- Ingestion script supports full and incremental runs.
- Raw JSONL and staging table are both produced.
- Row counts match API totals.
- Data quality checks pass with zero null keys.
- Failures are recoverable with documented retries.