"""Repository pattern for NCAA basketball data storage.
Defines an abstract ``Repository`` interface and a concrete
``ParquetRepository`` implementation backed by Apache Parquet files.
The abstraction lets downstream code remain storage-agnostic — a SQLite
implementation can be plugged in later (Story 5.5) without changing any
business logic.
"""
from __future__ import annotations
import abc
from pathlib import Path
import pandas as pd # type: ignore[import-untyped]
import pyarrow as pa # type: ignore[import-untyped]
import pyarrow.dataset as ds # type: ignore[import-untyped]
import pyarrow.parquet as pq # type: ignore[import-untyped]
from pydantic_core import PydanticUndefined
from ncaa_eval.ingest.schema import Game, Season, Team
# ---------------------------------------------------------------------------
# Abstract Repository
# ---------------------------------------------------------------------------
[docs]
class Repository(abc.ABC):
"""Abstract base class for NCAA data persistence."""
[docs]
@abc.abstractmethod
def get_teams(self) -> list[Team]:
"""Return all stored teams."""
[docs]
@abc.abstractmethod
def get_games(self, season: int) -> list[Game]:
"""Return all games for a given *season* year."""
[docs]
@abc.abstractmethod
def get_seasons(self) -> list[Season]:
"""Return all stored seasons."""
[docs]
@abc.abstractmethod
def save_teams(self, teams: list[Team]) -> None:
"""Persist a collection of teams (overwrite)."""
[docs]
@abc.abstractmethod
def save_games(self, games: list[Game]) -> None:
"""Persist a collection of games (overwrite per season partition)."""
[docs]
@abc.abstractmethod
def save_seasons(self, seasons: list[Season]) -> None:
"""Persist a collection of seasons (overwrite)."""
# ---------------------------------------------------------------------------
# Parquet Repository
# ---------------------------------------------------------------------------
# Explicit PyArrow schemas for deterministic column types across reads/writes.
_TEAM_SCHEMA = pa.schema(
[
("team_id", pa.int64()),
("team_name", pa.string()),
("canonical_name", pa.string()),
]
)
_SEASON_SCHEMA = pa.schema(
[
("year", pa.int64()),
]
)
_GAME_SCHEMA = pa.schema(
[
("game_id", pa.string()),
("season", pa.int64()),
("day_num", pa.int64()),
("date", pa.date32()),
("w_team_id", pa.int64()),
("l_team_id", pa.int64()),
("w_score", pa.int64()),
("l_score", pa.int64()),
("loc", pa.string()),
("num_ot", pa.int64()),
("is_tournament", pa.bool_()),
]
)
def _apply_model_defaults(df: pd.DataFrame, model: type[Game]) -> None:
"""Fill null values in *df* with non-None Pydantic field defaults.
When pyarrow unifies schemas across partitions that were written at
different schema versions, columns present in newer partitions but absent
in older ones are filled with null. This helper re-applies the Pydantic
model defaults so that ``model(**row)`` doesn't receive ``None`` for a
field that expects a concrete default value.
"""
sentinel = PydanticUndefined
for name, field_info in model.model_fields.items():
default = field_info.default
if name in df.columns and default is not sentinel and default is not None:
df[name] = df[name].fillna(default)
[docs]
class ParquetRepository(Repository):
"""Repository implementation backed by Parquet files.
Directory layout::
{base_path}/
teams.parquet
seasons.parquet
games/
season={year}/
data.parquet
"""
def __init__(self, base_path: Path) -> None:
self._base_path = base_path
# -- reads ---------------------------------------------------------------
[docs]
def get_teams(self) -> list[Team]:
"""Load all teams from the teams Parquet file."""
path = self._base_path / "teams.parquet"
if not path.exists():
return []
df = pd.read_parquet(path, engine="pyarrow")
return [Team(**row) for row in df.to_dict(orient="records")]
[docs]
def get_games(self, season: int) -> list[Game]:
"""Load games for a single season from hive-partitioned Parquet."""
games_dir = self._base_path / "games"
if not games_dir.exists():
return []
dataset = ds.dataset(
games_dir,
format="parquet",
partitioning=ds.partitioning(pa.schema([("season", pa.int64())]), flavor="hive"),
)
table = dataset.to_table(filter=ds.field("season") == season)
if table.num_rows == 0:
return []
df = table.to_pandas()
# Schema evolution: when the dataset spans partitions with different
# schemas (e.g., older files lack columns added later), pyarrow fills
# missing cells with null after unifying schemas. Re-apply Pydantic
# defaults for any column whose model field has a non-None default so
# model construction doesn't fail on unexpected null input.
_apply_model_defaults(df, Game)
return [Game(**row) for row in df.to_dict(orient="records")]
[docs]
def get_seasons(self) -> list[Season]:
"""Load all season records from the seasons Parquet file."""
path = self._base_path / "seasons.parquet"
if not path.exists():
return []
df = pd.read_parquet(path, engine="pyarrow")
return [Season(**row) for row in df.to_dict(orient="records")]
# -- writes --------------------------------------------------------------
[docs]
def save_teams(self, teams: list[Team]) -> None:
"""Persist team records to a Parquet file."""
if not teams:
return
self._base_path.mkdir(parents=True, exist_ok=True)
table = pa.Table.from_pydict(
{field: [getattr(t, field) for t in teams] for field in _TEAM_SCHEMA.names},
schema=_TEAM_SCHEMA,
)
pq.write_table(table, self._base_path / "teams.parquet")
[docs]
def save_games(self, games: list[Game]) -> None:
"""Persist game records to hive-partitioned Parquet by season."""
if not games:
return
games_dir = self._base_path / "games"
# Group games by season for partitioned writes.
seasons: dict[int, list[Game]] = {}
for g in games:
seasons.setdefault(g.season, []).append(g)
for season_year, season_games in seasons.items():
partition_dir = games_dir / f"season={season_year}"
partition_dir.mkdir(parents=True, exist_ok=True)
# Build a schema without the partition column (pyarrow hive
# partitioning stores it in the directory name).
write_schema = pa.schema([f for f in _GAME_SCHEMA if f.name != "season"])
data = {field.name: [getattr(g, field.name) for g in season_games] for field in write_schema}
table = pa.Table.from_pydict(data, schema=write_schema)
pq.write_table(table, partition_dir / "data.parquet")
[docs]
def save_seasons(self, seasons: list[Season]) -> None:
"""Persist season records to a Parquet file."""
if not seasons:
return
self._base_path.mkdir(parents=True, exist_ok=True)
table = pa.Table.from_pydict(
{field: [getattr(s, field) for s in seasons] for field in _SEASON_SCHEMA.names},
schema=_SEASON_SCHEMA,
)
pq.write_table(table, self._base_path / "seasons.parquet")