Advanced Code Example — Files, APIs, and Data Ingestion#
This example demonstrates reading and writing CSV and JSON files, simulating an API response pattern, and building a validated ingestion pipeline that handles real-world data quality issues.
Business Scenario#
You are building a customer data ingestion pipeline that:
- Reads raw customer data from a CSV file
- Validates and enriches each record
- Saves processed data as JSON for downstream analysis
- Simulates pulling supplemental data from an API
- Merges and reports on the combined dataset
Code#
import json
import csv
import io
# ── Simulated CSV Source Data ─────────────────────────────────────────
# (In real use, you'd do: with open('customers.csv', 'r') as f: reader = csv.DictReader(f))
raw_csv = """name,customer_id,region,total_spent,purchase_count,is_premium
Alice Johnson,1001,Northwest,1257.30,12,True
Bob Martinez,1002,Southwest,N/A,4,False
Carol Chen,1003,Northwest,890.75,9,True
David Kim,1004,Southeast,,2,False
Eve Torres,1005,Southwest,1450.00,15,True
"""
# ── Simulated API Response ────────────────────────────────────────────
# (In real use: import requests; response = requests.get(url); data = response.json())
api_response = {
"status": "success",
"customers": [
{"customer_id": 1001, "loyalty_score": 92, "churn_risk": "low"},
{"customer_id": 1002, "loyalty_score": 41, "churn_risk": "high"},
{"customer_id": 1003, "loyalty_score": 78, "churn_risk": "medium"},
{"customer_id": 1005, "loyalty_score": 95, "churn_risk": "low"},
]
}
# ── Ingestion: Read + Validate CSV ────────────────────────────────────
def parse_customer_csv(raw_text: str) -> tuple[list, list]:
"""Parse CSV text into validated customer records."""
reader = csv.DictReader(io.StringIO(raw_text))
valid_records = []
errors = []
for row in reader:
name = row.get('name', 'UNKNOWN').strip()
try:
customer_id = int(row['customer_id'])
total_spent = float(row['total_spent']) # raises ValueError if empty or 'N/A'
purchase_count = int(row['purchase_count'])
is_premium = row['is_premium'].strip().lower() == 'true'
if total_spent < 0:
raise ValueError(f"Negative total_spent: {total_spent}")
valid_records.append({
'name': name,
'customer_id': customer_id,
'region': row['region'].strip(),
'total_spent': round(total_spent, 2),
'purchase_count': purchase_count,
'is_premium': is_premium
})
except (ValueError, KeyError) as e:
errors.append({'name': name, 'error': str(e)})
return valid_records, errors
# ── Enrich with API Data ──────────────────────────────────────────────
def enrich_with_api(customers: list, api_data: dict) -> list:
"""Merge API loyalty/churn data into customer records."""
# Build a lookup dict from API response
api_lookup = {c['customer_id']: c for c in api_data.get('customers', [])}
enriched = []
for customer in customers:
cid = customer['customer_id']
api_info = api_lookup.get(cid, {})
enriched.append({
**customer, # unpack existing fields
'loyalty_score': api_info.get('loyalty_score', None),
'churn_risk': api_info.get('churn_risk', 'unknown')
})
return enriched
# ── Save to JSON ──────────────────────────────────────────────────────
def save_to_json(data: list, filepath: str) -> None:
"""Save processed records to a JSON file."""
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
print(f" Saved {len(data)} records to {filepath}")
# ── Load from JSON ────────────────────────────────────────────────────
def load_from_json(filepath: str) -> list:
"""Load records from a JSON file."""
with open(filepath, 'r') as f:
return json.load(f)
# ══════════════════════════════════════════════════════════
# PIPELINE EXECUTION
# ══════════════════════════════════════════════════════════
print("=" * 58)
print(" DATA INGESTION PIPELINE")
print("=" * 58)
# Step 1: Parse CSV
customers, errors = parse_customer_csv(raw_csv)
print(f"\n Step 1 — CSV Parsing:")
print(f" Valid records : {len(customers)}")
print(f" Errors skipped : {len(errors)}")
for err in errors:
print(f" ✗ {err['name']}: {err['error']}")
# Step 2: Enrich with API
enriched = enrich_with_api(customers, api_response)
print(f"\n Step 2 — API Enrichment:")
print(f" Records enriched: {len(enriched)}")
# Step 3: Save to JSON
save_to_json(enriched, '/tmp/customers_processed.json')
# Step 4: Load and display
loaded = load_from_json('/tmp/customers_processed.json')
print(f"\n Step 4 — Loaded from JSON ({len(loaded)} records):")
print()
print(f" {'Name':<20} {'Region':<12} {'Spent':>10} {'Churn':>8} {'Loyalty':>8}")
print(f" {'-'*20} {'-'*12} {'-'*10} {'-'*8} {'-'*8}")
for c in loaded:
loyalty = f"{c['loyalty_score']}/100" if c['loyalty_score'] else 'N/A'
print(f" {c['name']:<20} {c['region']:<12} ${c['total_spent']:>9,.2f} {c['churn_risk']:>8} {loyalty:>8}")
print("=" * 58)Expected Output#
==========================================================
DATA INGESTION PIPELINE
==========================================================
Step 1 — CSV Parsing:
Valid records : 3
Errors skipped : 2
✗ Bob Martinez: could not convert string to float: 'N/A'
✗ David Kim: could not convert string to float: ''
Step 2 — API Enrichment:
Records enriched: 3
Saved 3 records to /tmp/customers_processed.json
Step 4 — Loaded from JSON (3 records):
Name Region Spent Churn Loyalty
-------------------- ------------ ---------- -------- --------
Alice Johnson Northwest $1,257.30 low 92/100
Carol Chen Northwest $890.75 medium 78/100
Eve Torres Southwest $1,450.00 low 95/100
==========================================================Key Concepts Demonstrated#
| Concept | Where in Code |
|---|---|
csv.DictReader | Row-as-dictionary CSV reading |
io.StringIO | Reading string as file object |
with open(...) context manager | File read and write |
json.dump / json.load | Serializing/deserializing JSON |
| Dict comprehension for API lookup | {c['customer_id']: c for c in ...} |
**customer dict unpacking | Merging records in enrich_with_api() |
.get() with fallback | api_info.get('loyalty_score', None) |
Next: Jupyter Notebook →