Source code for ncaa_eval.ingest.repository

"""Repository pattern for NCAA basketball data storage.

Defines an abstract ``Repository`` interface and a concrete
``ParquetRepository`` implementation backed by Apache Parquet files.
The abstraction lets downstream code remain storage-agnostic — a SQLite
implementation can be plugged in later (Story 5.5) without changing any
business logic.
"""

from __future__ import annotations

import abc
from pathlib import Path

import pandas as pd  # type: ignore[import-untyped]
import pyarrow as pa  # type: ignore[import-untyped]
import pyarrow.dataset as ds  # type: ignore[import-untyped]
import pyarrow.parquet as pq  # type: ignore[import-untyped]
from pydantic_core import PydanticUndefined

from ncaa_eval.ingest.schema import Game, Season, Team

# ---------------------------------------------------------------------------
# Abstract Repository
# ---------------------------------------------------------------------------


[docs] class Repository(abc.ABC): """Abstract base class for NCAA data persistence."""
[docs] @abc.abstractmethod def get_teams(self) -> list[Team]: """Return all stored teams."""
[docs] @abc.abstractmethod def get_games(self, season: int) -> list[Game]: """Return all games for a given *season* year."""
[docs] @abc.abstractmethod def get_seasons(self) -> list[Season]: """Return all stored seasons."""
[docs] @abc.abstractmethod def save_teams(self, teams: list[Team]) -> None: """Persist a collection of teams (overwrite)."""
[docs] @abc.abstractmethod def save_games(self, games: list[Game]) -> None: """Persist a collection of games (overwrite per season partition)."""
[docs] @abc.abstractmethod def save_seasons(self, seasons: list[Season]) -> None: """Persist a collection of seasons (overwrite)."""
# --------------------------------------------------------------------------- # Parquet Repository # --------------------------------------------------------------------------- # Explicit PyArrow schemas for deterministic column types across reads/writes. _TEAM_SCHEMA = pa.schema( [ ("team_id", pa.int64()), ("team_name", pa.string()), ("canonical_name", pa.string()), ] ) _SEASON_SCHEMA = pa.schema( [ ("year", pa.int64()), ] ) _GAME_SCHEMA = pa.schema( [ ("game_id", pa.string()), ("season", pa.int64()), ("day_num", pa.int64()), ("date", pa.date32()), ("w_team_id", pa.int64()), ("l_team_id", pa.int64()), ("w_score", pa.int64()), ("l_score", pa.int64()), ("loc", pa.string()), ("num_ot", pa.int64()), ("is_tournament", pa.bool_()), ] ) def _apply_model_defaults(df: pd.DataFrame, model: type[Game]) -> None: """Fill null values in *df* with non-None Pydantic field defaults. When pyarrow unifies schemas across partitions that were written at different schema versions, columns present in newer partitions but absent in older ones are filled with null. This helper re-applies the Pydantic model defaults so that ``model(**row)`` doesn't receive ``None`` for a field that expects a concrete default value. """ sentinel = PydanticUndefined for name, field_info in model.model_fields.items(): default = field_info.default if name in df.columns and default is not sentinel and default is not None: df[name] = df[name].fillna(default)
[docs] class ParquetRepository(Repository): """Repository implementation backed by Parquet files. Directory layout:: {base_path}/ teams.parquet seasons.parquet games/ season={year}/ data.parquet """ def __init__(self, base_path: Path) -> None: self._base_path = base_path # -- reads ---------------------------------------------------------------
[docs] def get_teams(self) -> list[Team]: """Load all teams from the teams Parquet file.""" path = self._base_path / "teams.parquet" if not path.exists(): return [] df = pd.read_parquet(path, engine="pyarrow") return [Team(**row) for row in df.to_dict(orient="records")]
[docs] def get_games(self, season: int) -> list[Game]: """Load games for a single season from hive-partitioned Parquet.""" games_dir = self._base_path / "games" if not games_dir.exists(): return [] dataset = ds.dataset( games_dir, format="parquet", partitioning=ds.partitioning(pa.schema([("season", pa.int64())]), flavor="hive"), ) table = dataset.to_table(filter=ds.field("season") == season) if table.num_rows == 0: return [] df = table.to_pandas() # Schema evolution: when the dataset spans partitions with different # schemas (e.g., older files lack columns added later), pyarrow fills # missing cells with null after unifying schemas. Re-apply Pydantic # defaults for any column whose model field has a non-None default so # model construction doesn't fail on unexpected null input. _apply_model_defaults(df, Game) return [Game(**row) for row in df.to_dict(orient="records")]
[docs] def get_seasons(self) -> list[Season]: """Load all season records from the seasons Parquet file.""" path = self._base_path / "seasons.parquet" if not path.exists(): return [] df = pd.read_parquet(path, engine="pyarrow") return [Season(**row) for row in df.to_dict(orient="records")]
# -- writes --------------------------------------------------------------
[docs] def save_teams(self, teams: list[Team]) -> None: """Persist team records to a Parquet file.""" if not teams: return self._base_path.mkdir(parents=True, exist_ok=True) table = pa.Table.from_pydict( {field: [getattr(t, field) for t in teams] for field in _TEAM_SCHEMA.names}, schema=_TEAM_SCHEMA, ) pq.write_table(table, self._base_path / "teams.parquet")
[docs] def save_games(self, games: list[Game]) -> None: """Persist game records to hive-partitioned Parquet by season.""" if not games: return games_dir = self._base_path / "games" # Group games by season for partitioned writes. seasons: dict[int, list[Game]] = {} for g in games: seasons.setdefault(g.season, []).append(g) for season_year, season_games in seasons.items(): partition_dir = games_dir / f"season={season_year}" partition_dir.mkdir(parents=True, exist_ok=True) # Build a schema without the partition column (pyarrow hive # partitioning stores it in the directory name). write_schema = pa.schema([f for f in _GAME_SCHEMA if f.name != "season"]) data = {field.name: [getattr(g, field.name) for g in season_games] for field in write_schema} table = pa.Table.from_pydict(data, schema=write_schema) pq.write_table(table, partition_dir / "data.parquet")
[docs] def save_seasons(self, seasons: list[Season]) -> None: """Persist season records to a Parquet file.""" if not seasons: return self._base_path.mkdir(parents=True, exist_ok=True) table = pa.Table.from_pydict( {field: [getattr(s, field) for s in seasons] for field in _SEASON_SCHEMA.names}, schema=_SEASON_SCHEMA, ) pq.write_table(table, self._base_path / "seasons.parquet")