Advanced Code Example — Applied Integration#

This example is a complete, production-style analytics pipeline that integrates every major concept from the course. It includes data ingestion, validation, transformation, testing, aggregation, visualization, and a reproducible output report.


Business Scenario#

You are building the monthly customer analytics pipeline for a retail chain. The pipeline must be testable, reproducible, and produce a consistent executive summary report from raw CSV data.


Code#

import pandas as pd
import matplotlib.pyplot as plt
import json
from datetime import datetime

# ══════════════════════════════════════════════════════════
#  CONFIGURATION — single source of truth
# ══════════════════════════════════════════════════════════
CONFIG = {
    'tier_thresholds': {'Platinum': 1200, 'Gold': 600, 'Silver': 200},
    'required_columns': ['customer_name', 'region', 'total_spent', 'purchase_count'],
    'report_date': '2024-12',
    'output_path': '/tmp/analytics_report.json'
}


# ══════════════════════════════════════════════════════════
#  STEP 1 — Data Ingestion and Validation
# ══════════════════════════════════════════════════════════
def load_and_validate(df: pd.DataFrame) -> pd.DataFrame:
    """Validate incoming DataFrame against required schema."""
    # Check required columns
    missing = [c for c in CONFIG['required_columns'] if c not in df.columns]
    assert not missing, f"Missing required columns: {missing}"

    # Check data types
    assert pd.api.types.is_numeric_dtype(df['total_spent']), \
        "total_spent must be numeric"
    assert pd.api.types.is_numeric_dtype(df['purchase_count']), \
        "purchase_count must be numeric"

    # Remove nulls in critical columns
    before = len(df)
    df = df.dropna(subset=CONFIG['required_columns']).copy()
    dropped = before - len(df)
    if dropped > 0:
        print(f"  ⚠  Dropped {dropped} rows with missing values")

    # Remove negative or zero spend
    df = df[df['total_spent'] > 0]

    return df


# ══════════════════════════════════════════════════════════
#  STEP 2 — Transformation
# ══════════════════════════════════════════════════════════
def transform(df: pd.DataFrame) -> pd.DataFrame:
    """Add derived columns and apply business classifications."""
    thresholds = CONFIG['tier_thresholds']

    def assign_tier(total: float) -> str:
        for tier, threshold in thresholds.items():
            if total >= threshold:
                return tier
        return 'Standard'

    df['avg_purchase'] = (df['total_spent'] / df['purchase_count']).round(2)
    df['tier'] = df['total_spent'].apply(assign_tier)
    return df


# ══════════════════════════════════════════════════════════
#  STEP 3 — Unit Tests
# ══════════════════════════════════════════════════════════
def run_tests() -> bool:
    """Run unit tests on transformation logic. Returns True if all pass."""
    print("\n  Running unit tests...")

    thresholds = CONFIG['tier_thresholds']

    def assign_tier(total):
        for tier, threshold in thresholds.items():
            if total >= threshold: return tier
        return 'Standard'

    test_cases = [
        (1500, 'Platinum'),
        (1200, 'Platinum'),
        (900,  'Gold'),
        (600,  'Gold'),
        (300,  'Silver'),
        (199,  'Standard'),
        (0,    'Standard'),
    ]

    passed = 0
    for amount, expected in test_cases:
        result = assign_tier(amount)
        status = '✓' if result == expected else '✗'
        if result != expected:
            print(f"    {status} FAIL: assign_tier({amount}) = '{result}', expected '{expected}'")
        else:
            passed += 1

    print(f"  Tests: {passed}/{len(test_cases)} passed")
    return passed == len(test_cases)


# ══════════════════════════════════════════════════════════
#  STEP 4 — Analysis and Aggregation
# ══════════════════════════════════════════════════════════
def analyze(df: pd.DataFrame) -> dict:
    """Compute summary analytics from the transformed DataFrame."""
    regional = (
        df.groupby('region')
        .agg(revenue=('total_spent', 'sum'),
             customers=('customer_name', 'count'),
             avg_spent=('total_spent', 'mean'))
        .round(2)
        .reset_index()
    )

    tier_dist = (
        df.groupby('tier')
        .agg(count=('customer_name', 'count'),
             revenue=('total_spent', 'sum'))
        .round(2)
        .reset_index()
    )

    return {
        'total_revenue': round(df['total_spent'].sum(), 2),
        'total_customers': len(df),
        'avg_customer_value': round(df['total_spent'].mean(), 2),
        'regional': regional,
        'tier_distribution': tier_dist,
    }


# ══════════════════════════════════════════════════════════
#  STEP 5 — Visualization
# ══════════════════════════════════════════════════════════
def visualize(results: dict) -> None:
    """Generate the summary visualization."""
    NAVY, RED = '#041E42', '#C8102E'

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
    fig.suptitle(f"Customer Analytics — {CONFIG['report_date']}",
                 fontweight='bold', color=NAVY, fontsize=14)

    regional = results['regional']
    ax1.bar(regional['region'], regional['revenue'], color=NAVY)
    ax1.set_title('Revenue by Region', color=NAVY, fontweight='bold')
    ax1.set_ylabel('Revenue ($)')
    ax1.tick_params(axis='x', rotation=15)

    tier_dist = results['tier_distribution']
    ax2.barh(tier_dist['tier'], tier_dist['count'], color=RED)
    ax2.set_title('Customers by Tier', color=NAVY, fontweight='bold')
    ax2.set_xlabel('Customer Count')
    ax2.invert_yaxis()

    plt.tight_layout()
    plt.savefig('/tmp/analytics_chart.png', dpi=120, bbox_inches='tight')
    plt.show()
    print("  Chart saved to /tmp/analytics_chart.png")


# ══════════════════════════════════════════════════════════
#  STEP 6 — Reproducible Output
# ══════════════════════════════════════════════════════════
def save_report(results: dict) -> None:
    """Save pipeline results as a reproducible JSON report."""
    report = {
        'generated_at': datetime.now().isoformat(),
        'report_period': CONFIG['report_date'],
        'config': CONFIG['tier_thresholds'],
        'summary': {
            'total_revenue': results['total_revenue'],
            'total_customers': results['total_customers'],
            'avg_customer_value': results['avg_customer_value'],
        },
        'regional': results['regional'].to_dict(orient='records'),
        'tier_distribution': results['tier_distribution'].to_dict(orient='records'),
    }
    with open(CONFIG['output_path'], 'w') as f:
        json.dump(report, f, indent=2)
    print(f"  Report saved: {CONFIG['output_path']}")


# ══════════════════════════════════════════════════════════
#  PIPELINE EXECUTION
# ══════════════════════════════════════════════════════════
raw_data = pd.DataFrame({
    'customer_name': ['Alice Johnson', 'Bob Martinez', 'Carol Chen',
                      'David Kim', 'Eve Torres', 'Frank Li', 'Grace Park', None],
    'region':        ['Northwest', 'Southwest', 'Northwest',
                      'Southeast', 'Southwest', 'Northeast', 'Northwest', 'Northwest'],
    'total_spent':   [1257.30, 430.50, 890.75, 125.00, 1450.00, 675.25, 980.00, None],
    'purchase_count':[12, 4, 9, 2, 15, 7, 11, 0]
})

print("=" * 58)
print("  MSBA 604 — ANALYTICS PIPELINE")
print(f"  Period: {CONFIG['report_date']}")
print("=" * 58)

# Run tests first
all_tests_passed = run_tests()
assert all_tests_passed, "Tests failed — fix logic before running pipeline"

# Execute pipeline
print("\n  Step 1: Load and validate")
clean_df = load_and_validate(raw_data)
print(f"    Records after validation: {len(clean_df)}")

print("\n  Step 2: Transform")
transformed_df = transform(clean_df)

print("\n  Step 3: Analyze")
results = analyze(transformed_df)
print(f"    Total Revenue    : ${results['total_revenue']:,.2f}")
print(f"    Total Customers  : {results['total_customers']}")
print(f"    Avg Cust. Value  : ${results['avg_customer_value']:,.2f}")

print("\n  Step 4: Visualize")
visualize(results)

print("\n  Step 5: Save Report")
save_report(results)

print("\n" + "=" * 58)
print("  PIPELINE COMPLETE — All steps succeeded")
print("=" * 58)

Expected Output#

==========================================================
  MSBA 604 — ANALYTICS PIPELINE
  Period: 2024-12
==========================================================

  Running unit tests...
  Tests: 7/7 passed

  Step 1: Load and validate
  ⚠  Dropped 1 rows with missing values
    Records after validation: 7

  Step 2: Transform

  Step 3: Analyze
    Total Revenue    : $4,808.80
    Total Customers  : 7
    Avg Cust. Value  : $687.00

  Step 4: Visualize
  Chart saved to /tmp/analytics_chart.png

  Step 5: Save Report
  Report saved: /tmp/analytics_report.json

==========================================================
  PIPELINE COMPLETE — All steps succeeded
==========================================================

Next: Capstone Notebook →