#!/usr/bin/env python3
"""Build bronze, silver, and gold Customer 360 data products.

This implementation deliberately uses only Python standard libraries so the
case-study project can run on a clean workstation. The logic mirrors a
production CDC and streaming lakehouse pipeline: raw source snapshots are copied
into bronze, cleaned entities are written to silver, and serving-ready Customer
360 aggregates are written to gold.
"""

from __future__ import annotations

import csv
import shutil
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from statistics import mean

BASE = Path(__file__).resolve().parents[1]
RAW = BASE / "data" / "raw"
BRONZE = BASE / "data" / "bronze"
SILVER = BASE / "data" / "silver"
GOLD = BASE / "data" / "gold"
for directory in [BRONZE, SILVER, GOLD]:
    directory.mkdir(parents=True, exist_ok=True)

NOW = datetime.fromisoformat("2026-05-19T09:00:00")


def read_csv(path: Path) -> list[dict]:
    with path.open("r", newline="", encoding="utf-8") as f:
        return list(csv.DictReader(f))


def write_csv(path: Path, rows: list[dict], fieldnames: list[str] | None = None) -> None:
    if not fieldnames:
        fieldnames = list(rows[0].keys()) if rows else []
    with path.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)


def ts(value: str) -> datetime | None:
    return datetime.fromisoformat(value) if value else None


def safe_float(value: str | float | int) -> float:
    try:
        return float(value)
    except (TypeError, ValueError):
        return 0.0


def safe_int(value: str | int) -> int:
    try:
        return int(value)
    except (TypeError, ValueError):
        return 0


def score_recency(days: int) -> int:
    if days <= 7:
        return 5
    if days <= 30:
        return 4
    if days <= 90:
        return 3
    return 1


def score_frequency(count: int) -> int:
    if count <= 1:
        return 1
    if count <= 3:
        return 2
    if count <= 6:
        return 4
    return 5


def score_monetary(amount: float) -> int:
    if amount <= 50:
        return 1
    if amount <= 200:
        return 2
    if amount <= 500:
        return 4
    return 5


def main() -> None:
    customers = read_csv(RAW / "customers.csv")
    orders = read_csv(RAW / "orders.csv")
    clicks = read_csv(RAW / "clickstream.csv")
    tickets = read_csv(RAW / "support_tickets.csv")
    campaigns = read_csv(RAW / "campaign_events.csv")
    consent = read_csv(RAW / "consent_events.csv")

    for source_file in RAW.glob("*.csv"):
        shutil.copy2(source_file, BRONZE / source_file.name.replace(".csv", "_bronze.csv"))

    latest_consent: dict[str, dict] = {}
    for row in consent:
        cid = row["customer_id"]
        if cid not in latest_consent or ts(row["event_time"]) > ts(latest_consent[cid]["event_time"]):
            latest_consent[cid] = row

    identity_rows = []
    for c in customers:
        identity_rows.append({
            "customer_id": c["customer_id"],
            "resolved_customer_key": c["customer_id"],
            "email_hash": c["email_hash"],
            "loyalty_id": c["loyalty_id"],
            "phone_hash": c["phone_hash"],
            "country": c["country"],
            "preferred_channel": c["preferred_channel"],
            "identity_rule": "loyalty_id" if c["loyalty_id"] else "email_hash",
        })
    write_csv(SILVER / "customer_identity.csv", identity_rows)

    order_agg: dict[str, dict] = defaultdict(lambda: {"amounts": [], "last_order_at": None, "order_count": 0})
    for row in orders:
        if row["status"] != "paid":
            continue
        cid = row["customer_id"]
        event_time = ts(row["event_time"])
        order_agg[cid]["amounts"].append(safe_float(row["amount_usd"]))
        order_agg[cid]["order_count"] += 1
        if order_agg[cid]["last_order_at"] is None or event_time > order_agg[cid]["last_order_at"]:
            order_agg[cid]["last_order_at"] = event_time

    click_agg: dict[str, dict] = defaultdict(lambda: {"events": 0, "sessions": set(), "last_click_at": None})
    for row in clicks:
        cid = row.get("customer_id", "")
        if not cid:
            continue
        event_time = ts(row["event_time"])
        click_agg[cid]["events"] += 1
        click_agg[cid]["sessions"].add(row["session_id"])
        if click_agg[cid]["last_click_at"] is None or event_time > click_agg[cid]["last_click_at"]:
            click_agg[cid]["last_click_at"] = event_time

    ticket_agg: dict[str, dict] = defaultdict(lambda: {"count": 0, "csat": [], "resolved_hours": []})
    for row in tickets:
        cid = row["customer_id"]
        ticket_agg[cid]["count"] += 1
        ticket_agg[cid]["csat"].append(safe_int(row["csat"]))
        ticket_agg[cid]["resolved_hours"].append(safe_float(row["resolved_hours"]))

    campaign_agg: dict[str, dict] = defaultdict(lambda: {"sent": 0, "opened": 0, "clicked": 0})
    for row in campaigns:
        cid = row["customer_id"]
        campaign_agg[cid]["sent"] += 1
        campaign_agg[cid]["opened"] += 1 if row["opened_at"] else 0
        campaign_agg[cid]["clicked"] += 1 if row["clicked_at"] else 0

    profile_rows = []
    for c in customers:
        cid = c["customer_id"]
        oa = order_agg[cid]
        ca = click_agg[cid]
        ta = ticket_agg[cid]
        ma = campaign_agg[cid]
        lc = latest_consent.get(cid, {})
        order_count = int(oa["order_count"])
        total_revenue = round(sum(oa["amounts"]), 2)
        avg_order_value = round(mean(oa["amounts"]), 2) if oa["amounts"] else 0.0
        days_since_last_order = (NOW - oa["last_order_at"]).days if oa["last_order_at"] else 999
        open_rate = round(ma["opened"] / ma["sent"], 4) if ma["sent"] else 0.0
        click_rate = round(ma["clicked"] / ma["sent"], 4) if ma["sent"] else 0.0
        avg_csat = round(mean(ta["csat"]), 2) if ta["csat"] else 0.0
        avg_resolution_hours = round(mean(ta["resolved_hours"]), 2) if ta["resolved_hours"] else 0.0
        recency_score = score_recency(days_since_last_order)
        frequency_score = score_frequency(order_count)
        monetary_score = score_monetary(total_revenue)
        value_score = round((recency_score + frequency_score + monetary_score) / 3, 2)
        churn_risk = round(
            0.45 * min(days_since_last_order, 180) / 180
            + 0.25 * (1 - min(ca["events"], 20) / 20)
            + 0.20 * min(ta["count"], 5) / 5
            + 0.10 * (1 - min(open_rate, 1)),
            3,
        )
        if value_score <= 1.8:
            segment = "new_or_low"
        elif value_score <= 3.2:
            segment = "developing"
        elif value_score <= 4.3:
            segment = "valuable"
        else:
            segment = "champion"
        retention_priority = "urgent" if churn_risk > 0.65 else "nurture" if churn_risk > 0.35 else "monitor"
        profile_rows.append({
            "customer_id": cid,
            "country": c["country"],
            "preferred_channel": c["preferred_channel"],
            "created_at": c["created_at"],
            "marketing_consent": lc.get("marketing_consent", "false"),
            "personalization_consent": lc.get("personalization_consent", "false"),
            "order_count": order_count,
            "total_revenue_usd": total_revenue,
            "avg_order_value_usd": avg_order_value,
            "days_since_last_order": days_since_last_order,
            "click_events_45d": ca["events"],
            "sessions_45d": len(ca["sessions"]),
            "support_tickets_120d": ta["count"],
            "avg_csat": avg_csat,
            "avg_resolution_hours": avg_resolution_hours,
            "campaigns_sent_90d": ma["sent"],
            "open_rate": open_rate,
            "click_rate": click_rate,
            "customer_value_score": value_score,
            "churn_risk_score": churn_risk,
            "segment": segment,
            "retention_priority": retention_priority,
        })

    silver_fields = ["customer_id", "country", "preferred_channel", "created_at", "marketing_consent", "personalization_consent", "order_count", "total_revenue_usd", "click_events_45d", "support_tickets_120d", "campaigns_sent_90d"]
    write_csv(SILVER / "customer_profile_base.csv", [{k: row[k] for k in silver_fields} for row in profile_rows], silver_fields)

    gold_fields = ["customer_id", "country", "preferred_channel", "marketing_consent", "personalization_consent", "order_count", "total_revenue_usd", "avg_order_value_usd", "days_since_last_order", "click_events_45d", "sessions_45d", "support_tickets_120d", "avg_csat", "open_rate", "click_rate", "customer_value_score", "churn_risk_score", "segment", "retention_priority"]
    customer_360 = sorted([{k: row[k] for k in gold_fields} for row in profile_rows], key=lambda r: (r["retention_priority"], r["customer_value_score"]), reverse=True)
    write_csv(GOLD / "customer_360.csv", customer_360, gold_fields)

    segment_summary = []
    for segment in sorted({row["segment"] for row in profile_rows}):
        rows = [row for row in profile_rows if row["segment"] == segment]
        segment_summary.append({
            "segment": segment,
            "customers": len(rows),
            "revenue_usd": round(sum(float(row["total_revenue_usd"]) for row in rows), 2),
            "avg_churn_risk": round(mean(float(row["churn_risk_score"]) for row in rows), 3),
            "avg_value_score": round(mean(float(row["customer_value_score"]) for row in rows), 2),
        })
    write_csv(GOLD / "segment_summary.csv", segment_summary)

    paid_orders = [row for row in orders if row["status"] == "paid"]
    marketing_consent_count = sum(1 for row in profile_rows if row["marketing_consent"] == "true")
    urgent_count = sum(1 for row in profile_rows if row["retention_priority"] == "urgent")
    kpi_rows = [
        {"metric": "customers", "value": len(customers)},
        {"metric": "paid_orders", "value": len(paid_orders)},
        {"metric": "gross_revenue_usd", "value": round(sum(safe_float(row["amount_usd"]) for row in paid_orders), 2)},
        {"metric": "profile_coverage_rate", "value": round(len(customer_360) / len(customers), 4)},
        {"metric": "marketing_consent_rate", "value": round(marketing_consent_count / len(customers), 4)},
        {"metric": "urgent_retention_customers", "value": urgent_count},
    ]
    write_csv(GOLD / "kpi_summary.csv", kpi_rows)
    print(f"Built Customer 360 outputs in {BASE / 'data'}")


if __name__ == "__main__":
    main()
