#!/usr/bin/env python3
"""Build a deterministic Bronze-Silver-Gold data lake layout for Chapter 6.

The default mode writes to a local folder whose paths mirror S3 object keys. This keeps
the lab deterministic for every reader. The optional MinIO/S3 exercise can later upload
these same files using the same key layout.
"""
from __future__ import annotations

import argparse
import csv
import json
import shutil
from datetime import datetime, timezone
from pathlib import Path

import pandas as pd


REPO_LAB = Path(__file__).resolve().parent
DATA_DIR = REPO_LAB / 'data'
INGEST_DATE = '2026-05-30'


def read_jsonl(path: Path) -> list[dict]:
    return [json.loads(line) for line in path.read_text(encoding='utf-8').splitlines() if line.strip()]


def write_jsonl(path: Path, rows: list[dict]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text('\n'.join(json.dumps(row, sort_keys=True) for row in rows) + ('\n' if rows else ''), encoding='utf-8')


def load_products(path: Path) -> dict[str, dict]:
    with path.open('r', encoding='utf-8', newline='') as f:
        return {row['product_id']: row for row in csv.DictReader(f)}


def validate_orders(orders: list[dict], products: dict[str, dict]) -> tuple[list[dict], list[dict]]:
    clean: list[dict] = []
    rejected: list[dict] = []
    seen_ids: set[str] = set()
    for row in orders:
        reasons: list[str] = []
        order_id = row.get('order_id')
        if not order_id:
            reasons.append('missing_order_id')
        elif order_id in seen_ids:
            reasons.append('duplicate_order_id')
        else:
            seen_ids.add(order_id)

        product_id = row.get('product_id')
        if product_id not in products:
            reasons.append('unknown_product_id')

        try:
            quantity = int(row.get('quantity'))
            if quantity <= 0:
                reasons.append('non_positive_quantity')
        except Exception:
            quantity = None
            reasons.append('invalid_quantity')

        try:
            unit_price = float(row.get('unit_price'))
            if unit_price < 0:
                reasons.append('negative_unit_price')
        except Exception:
            unit_price = None
            reasons.append('invalid_unit_price')

        try:
            ts = pd.to_datetime(row.get('order_ts'), utc=True)
            if pd.isna(ts):
                raise ValueError('missing timestamp')
            order_date = ts.date().isoformat()
        except Exception:
            ts = None
            order_date = None
            reasons.append('invalid_order_ts')

        if reasons:
            rejected.append({**row, 'rejection_reasons': '|'.join(reasons)})
            continue

        product = products[product_id]
        clean.append({
            'order_id': order_id,
            'customer_id': row.get('customer_id'),
            'product_id': product_id,
            'category': product['category'],
            'product_name': product['product_name'],
            'order_ts': ts.isoformat(),
            'order_date': order_date,
            'quantity': quantity,
            'unit_price': round(unit_price, 2),
            'line_amount': round(quantity * unit_price, 2),
            'channel': row.get('channel'),
        })
    return clean, rejected


def write_parquet_dataset(df: pd.DataFrame, root: Path, partition_col: str | None = None) -> None:
    root.mkdir(parents=True, exist_ok=True)
    if partition_col and not df.empty:
        for value, part in df.groupby(partition_col, sort=True):
            part_dir = root / f'{partition_col}={value}'
            part_dir.mkdir(parents=True, exist_ok=True)
            part.drop(columns=[]).to_parquet(part_dir / 'part-0000.parquet', index=False)
    else:
        df.to_parquet(root / 'part-0000.parquet', index=False)


def main() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument('--lake-root', default='.lake/turanmart-lake-dev', help='Local folder that mirrors the object-storage bucket root.')
    parser.add_argument('--reset', action='store_true', help='Delete the lake root before writing outputs.')
    args = parser.parse_args()

    lake = Path(args.lake_root)
    if args.reset and lake.exists():
        shutil.rmtree(lake)
    lake.mkdir(parents=True, exist_ok=True)

    orders_path = DATA_DIR / 'orders.jsonl'
    products_path = DATA_DIR / 'products.csv'
    orders = read_jsonl(orders_path)
    products = load_products(products_path)

    bronze = lake / f'bronze/commerce/orders/ingest_date={INGEST_DATE}/batch_id=001'
    bronze.mkdir(parents=True, exist_ok=True)
    shutil.copy2(orders_path, bronze / 'orders.jsonl')
    shutil.copy2(products_path, bronze / 'products.csv')
    manifest = {
        'dataset': 'commerce.orders',
        'ingest_date': INGEST_DATE,
        'source_files': ['orders.jsonl', 'products.csv'],
        'bronze_row_count': len(orders),
        'created_by': 'chapter_06_starter.py',
    }
    (bronze / 'manifest.json').write_text(json.dumps(manifest, indent=2, sort_keys=True) + '\n', encoding='utf-8')

    clean, rejected = validate_orders(orders, products)
    clean_df = pd.DataFrame(clean).sort_values(['order_date', 'order_id']).reset_index(drop=True)
    rejected_df = pd.DataFrame(rejected).sort_values(['order_id']).reset_index(drop=True)

    silver_clean_root = lake / 'silver/commerce/orders_clean'
    silver_reject_root = lake / f'silver/commerce/orders_rejected/ingest_date={INGEST_DATE}'
    write_parquet_dataset(clean_df, silver_clean_root, partition_col='order_date')
    write_jsonl(silver_reject_root / 'rejected_orders.jsonl', rejected_df.to_dict('records'))

    gold_df = (
        clean_df.groupby('order_date', as_index=False)
        .agg(order_count=('order_id', 'nunique'), item_count=('quantity', 'sum'), total_revenue=('line_amount', 'sum'))
        .sort_values('order_date')
    )
    gold_df['total_revenue'] = gold_df['total_revenue'].round(2)
    gold_root = lake / 'gold/commerce/daily_revenue'
    write_parquet_dataset(gold_df, gold_root, partition_col='order_date')
    gold_df.to_csv(gold_root / 'daily_revenue.csv', index=False)

    print(f'BRONZE orders rows: {len(orders)}')
    print(f'SILVER clean orders rows: {len(clean_df)}')
    print(f'SILVER rejected orders rows: {len(rejected_df)}')
    print(f'GOLD daily revenue rows: {len(gold_df)}')
    print(f'GOLD total_revenue: {gold_df["total_revenue"].sum():.2f}')
    print(f'Lake root: {lake}')


if __name__ == '__main__':
    main()
