Advanced Code Example — Applied Integration#
This example is a complete, production-style analytics pipeline that integrates every major concept from the course. It includes data ingestion, validation, transformation, testing, aggregation, visualization, and a reproducible output report.
Business Scenario#
You are building the monthly customer analytics pipeline for a retail chain. The pipeline must be testable, reproducible, and produce a consistent executive summary report from raw CSV data.
Code#
import pandas as pd
import matplotlib.pyplot as plt
import json
from datetime import datetime
# ══════════════════════════════════════════════════════════
# CONFIGURATION — single source of truth
# ══════════════════════════════════════════════════════════
CONFIG = {
'tier_thresholds': {'Platinum': 1200, 'Gold': 600, 'Silver': 200},
'required_columns': ['customer_name', 'region', 'total_spent', 'purchase_count'],
'report_date': '2024-12',
'output_path': '/tmp/analytics_report.json'
}
# ══════════════════════════════════════════════════════════
# STEP 1 — Data Ingestion and Validation
# ══════════════════════════════════════════════════════════
def load_and_validate(df: pd.DataFrame) -> pd.DataFrame:
"""Validate incoming DataFrame against required schema."""
# Check required columns
missing = [c for c in CONFIG['required_columns'] if c not in df.columns]
assert not missing, f"Missing required columns: {missing}"
# Check data types
assert pd.api.types.is_numeric_dtype(df['total_spent']), \
"total_spent must be numeric"
assert pd.api.types.is_numeric_dtype(df['purchase_count']), \
"purchase_count must be numeric"
# Remove nulls in critical columns
before = len(df)
df = df.dropna(subset=CONFIG['required_columns']).copy()
dropped = before - len(df)
if dropped > 0:
print(f" ⚠ Dropped {dropped} rows with missing values")
# Remove negative or zero spend
df = df[df['total_spent'] > 0]
return df
# ══════════════════════════════════════════════════════════
# STEP 2 — Transformation
# ══════════════════════════════════════════════════════════
def transform(df: pd.DataFrame) -> pd.DataFrame:
"""Add derived columns and apply business classifications."""
thresholds = CONFIG['tier_thresholds']
def assign_tier(total: float) -> str:
for tier, threshold in thresholds.items():
if total >= threshold:
return tier
return 'Standard'
df['avg_purchase'] = (df['total_spent'] / df['purchase_count']).round(2)
df['tier'] = df['total_spent'].apply(assign_tier)
return df
# ══════════════════════════════════════════════════════════
# STEP 3 — Unit Tests
# ══════════════════════════════════════════════════════════
def run_tests() -> bool:
"""Run unit tests on transformation logic. Returns True if all pass."""
print("\n Running unit tests...")
thresholds = CONFIG['tier_thresholds']
def assign_tier(total):
for tier, threshold in thresholds.items():
if total >= threshold: return tier
return 'Standard'
test_cases = [
(1500, 'Platinum'),
(1200, 'Platinum'),
(900, 'Gold'),
(600, 'Gold'),
(300, 'Silver'),
(199, 'Standard'),
(0, 'Standard'),
]
passed = 0
for amount, expected in test_cases:
result = assign_tier(amount)
status = '✓' if result == expected else '✗'
if result != expected:
print(f" {status} FAIL: assign_tier({amount}) = '{result}', expected '{expected}'")
else:
passed += 1
print(f" Tests: {passed}/{len(test_cases)} passed")
return passed == len(test_cases)
# ══════════════════════════════════════════════════════════
# STEP 4 — Analysis and Aggregation
# ══════════════════════════════════════════════════════════
def analyze(df: pd.DataFrame) -> dict:
"""Compute summary analytics from the transformed DataFrame."""
regional = (
df.groupby('region')
.agg(revenue=('total_spent', 'sum'),
customers=('customer_name', 'count'),
avg_spent=('total_spent', 'mean'))
.round(2)
.reset_index()
)
tier_dist = (
df.groupby('tier')
.agg(count=('customer_name', 'count'),
revenue=('total_spent', 'sum'))
.round(2)
.reset_index()
)
return {
'total_revenue': round(df['total_spent'].sum(), 2),
'total_customers': len(df),
'avg_customer_value': round(df['total_spent'].mean(), 2),
'regional': regional,
'tier_distribution': tier_dist,
}
# ══════════════════════════════════════════════════════════
# STEP 5 — Visualization
# ══════════════════════════════════════════════════════════
def visualize(results: dict) -> None:
"""Generate the summary visualization."""
NAVY, RED = '#041E42', '#C8102E'
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
fig.suptitle(f"Customer Analytics — {CONFIG['report_date']}",
fontweight='bold', color=NAVY, fontsize=14)
regional = results['regional']
ax1.bar(regional['region'], regional['revenue'], color=NAVY)
ax1.set_title('Revenue by Region', color=NAVY, fontweight='bold')
ax1.set_ylabel('Revenue ($)')
ax1.tick_params(axis='x', rotation=15)
tier_dist = results['tier_distribution']
ax2.barh(tier_dist['tier'], tier_dist['count'], color=RED)
ax2.set_title('Customers by Tier', color=NAVY, fontweight='bold')
ax2.set_xlabel('Customer Count')
ax2.invert_yaxis()
plt.tight_layout()
plt.savefig('/tmp/analytics_chart.png', dpi=120, bbox_inches='tight')
plt.show()
print(" Chart saved to /tmp/analytics_chart.png")
# ══════════════════════════════════════════════════════════
# STEP 6 — Reproducible Output
# ══════════════════════════════════════════════════════════
def save_report(results: dict) -> None:
"""Save pipeline results as a reproducible JSON report."""
report = {
'generated_at': datetime.now().isoformat(),
'report_period': CONFIG['report_date'],
'config': CONFIG['tier_thresholds'],
'summary': {
'total_revenue': results['total_revenue'],
'total_customers': results['total_customers'],
'avg_customer_value': results['avg_customer_value'],
},
'regional': results['regional'].to_dict(orient='records'),
'tier_distribution': results['tier_distribution'].to_dict(orient='records'),
}
with open(CONFIG['output_path'], 'w') as f:
json.dump(report, f, indent=2)
print(f" Report saved: {CONFIG['output_path']}")
# ══════════════════════════════════════════════════════════
# PIPELINE EXECUTION
# ══════════════════════════════════════════════════════════
raw_data = pd.DataFrame({
'customer_name': ['Alice Johnson', 'Bob Martinez', 'Carol Chen',
'David Kim', 'Eve Torres', 'Frank Li', 'Grace Park', None],
'region': ['Northwest', 'Southwest', 'Northwest',
'Southeast', 'Southwest', 'Northeast', 'Northwest', 'Northwest'],
'total_spent': [1257.30, 430.50, 890.75, 125.00, 1450.00, 675.25, 980.00, None],
'purchase_count':[12, 4, 9, 2, 15, 7, 11, 0]
})
print("=" * 58)
print(" MSBA 604 — ANALYTICS PIPELINE")
print(f" Period: {CONFIG['report_date']}")
print("=" * 58)
# Run tests first
all_tests_passed = run_tests()
assert all_tests_passed, "Tests failed — fix logic before running pipeline"
# Execute pipeline
print("\n Step 1: Load and validate")
clean_df = load_and_validate(raw_data)
print(f" Records after validation: {len(clean_df)}")
print("\n Step 2: Transform")
transformed_df = transform(clean_df)
print("\n Step 3: Analyze")
results = analyze(transformed_df)
print(f" Total Revenue : ${results['total_revenue']:,.2f}")
print(f" Total Customers : {results['total_customers']}")
print(f" Avg Cust. Value : ${results['avg_customer_value']:,.2f}")
print("\n Step 4: Visualize")
visualize(results)
print("\n Step 5: Save Report")
save_report(results)
print("\n" + "=" * 58)
print(" PIPELINE COMPLETE — All steps succeeded")
print("=" * 58)Expected Output#
==========================================================
MSBA 604 — ANALYTICS PIPELINE
Period: 2024-12
==========================================================
Running unit tests...
Tests: 7/7 passed
Step 1: Load and validate
⚠ Dropped 1 rows with missing values
Records after validation: 7
Step 2: Transform
Step 3: Analyze
Total Revenue : $4,808.80
Total Customers : 7
Avg Cust. Value : $687.00
Step 4: Visualize
Chart saved to /tmp/analytics_chart.png
Step 5: Save Report
Report saved: /tmp/analytics_report.json
==========================================================
PIPELINE COMPLETE — All steps succeeded
==========================================================Next: Capstone Notebook →