Chapter 2 Lab: CSV, JSON, Parquet, and Quality¶

This notebook converts small TuranMart CSV and JSON Lines files into typed Parquet outputs and validates them with simple quality checks. Run it from the repository root or adjust ROOT below.

In [ ]:
from pathlib import Path
import pandas as pd
import duckdb


def find_repo_root(start: Path) -> Path:
    for candidate in [start.resolve(), *start.resolve().parents]:
        if (candidate / "shared" / "labs" / "ch02_data_models_formats_quality").exists():
            return candidate
    raise FileNotFoundError("Could not locate repository root containing shared/labs/ch02_data_models_formats_quality")

ROOT = find_repo_root(Path.cwd())
LAB = ROOT / "shared" / "labs" / "ch02_data_models_formats_quality"
DATA = LAB / "data"
OUT = LAB / "output" / "parquet"
OUT.mkdir(parents=True, exist_ok=True)

print("Repository root:", ROOT)
print("Lab directory:", LAB)

1. Load raw CSV and JSON Lines files¶

In [ ]:
orders = pd.read_csv(DATA / "orders.csv")
order_items = pd.read_csv(DATA / "order_items.csv")
events = pd.read_json(DATA / "events.jsonl", lines=True)

print("orders:", len(orders))
print("order_items:", len(order_items))
print("events:", len(events))
orders.head()

2. Apply explicit types¶

In [ ]:
orders["order_ts"] = pd.to_datetime(orders["order_ts"], utc=True)
orders["total_amount"] = orders["total_amount"].astype("float64")
order_items["quantity"] = order_items["quantity"].astype("int64")
order_items["unit_price"] = order_items["unit_price"].astype("float64")
events["event_ts"] = pd.to_datetime(events["event_ts"], utc=True)
events["campaign_id"] = events["campaign"].apply(lambda x: x.get("campaign_id") if isinstance(x, dict) else None)
events["campaign_channel"] = events["campaign"].apply(lambda x: x.get("channel") if isinstance(x, dict) else None)
events = events.drop(columns=["campaign"])
orders.dtypes

3. Run quality checks¶

In [ ]:
valid_statuses = {"paid", "pending", "refunded", "cancelled", "shipped"}
checks = {
    "orders_have_unique_ids": orders["order_id"].is_unique,
    "orders_required_fields_not_null": orders[["order_id", "customer_id", "order_ts", "payment_status"]].notna().all().all(),
    "payment_status_is_valid": orders["payment_status"].isin(valid_statuses).all(),
    "items_have_positive_quantity": (order_items["quantity"] > 0).all(),
    "items_reference_existing_orders": order_items["order_id"].isin(orders["order_id"]).all(),
    "events_have_unique_ids": events["event_id"].is_unique,
}
quality_report = pd.DataFrame([
    {"check": name, "status": "PASS" if bool(ok) else "FAIL"}
    for name, ok in checks.items()
])
quality_report
In [ ]:
if (quality_report["status"] == "FAIL").any():
    raise ValueError("One or more quality checks failed")

4. Write Parquet outputs¶

In [ ]:
orders.to_parquet(OUT / "orders.parquet", index=False)
order_items.to_parquet(OUT / "order_items.parquet", index=False)
events.to_parquet(OUT / "events.parquet", index=False)

for path in sorted(OUT.glob("*.parquet")):
    print(path.name, path.stat().st_size, "bytes")

5. Query Parquet with DuckDB¶

In [ ]:
query = f"""
SELECT
    CAST(order_ts AS DATE) AS order_date,
    region,
    COUNT(*) AS order_count,
    ROUND(SUM(total_amount), 2) AS total_amount
FROM read_parquet('{(OUT / "orders.parquet").as_posix()}')
WHERE payment_status IN ('paid', 'refunded')
GROUP BY 1, 2
ORDER BY 1, 2
"""
duckdb.sql(query).df()

Reflection¶

The conversion succeeded because the notebook made assumptions executable: identifiers are unique, required fields are present, statuses are valid, quantities are positive, and line items refer to known orders. In production, the same ideas should run automatically inside pipelines and orchestration workflows.