"""Batch opponent adjustment rating solvers: SRS, Ridge regression, Colley Matrix."""
from __future__ import annotations
import logging
import numpy as np
import numpy.typing as npt
import pandas as pd # type: ignore[import-untyped]
from sklearn.linear_model import Ridge # type: ignore[import-untyped]
from ncaa_eval.transform.constants import DEFAULT_MARGIN_CAP
logger = logging.getLogger(__name__)
DEFAULT_RIDGE_LAMBDA: float = 20.0
DEFAULT_SRS_MAX_ITER: int = 10_000
_SRS_CONVERGENCE_TOL: float = 1e-6
[docs]
class BatchRatingSolver:
"""Batch rating solver that produces full-season opponent-adjusted ratings.
All solvers accept a pre-loaded DataFrame of compact regular-season games
(caller must filter to ``is_tournament == False`` before passing).
Args:
margin_cap: Maximum point margin applied per game (default 25).
ridge_lambda: Regularization strength for Ridge solver (default 20.0).
srs_max_iter: Maximum iterations for SRS fixed-point convergence (default 10,000).
"""
def __init__(
self,
*,
margin_cap: int = DEFAULT_MARGIN_CAP,
ridge_lambda: float = DEFAULT_RIDGE_LAMBDA,
srs_max_iter: int = DEFAULT_SRS_MAX_ITER,
) -> None:
self._margin_cap = margin_cap
self._ridge_lambda = ridge_lambda
self._srs_max_iter = srs_max_iter
[docs]
def compute_srs(self, games_df: pd.DataFrame) -> pd.DataFrame:
"""Compute SRS (Simple Rating System) ratings via fixed-point iteration.
Args:
games_df: DataFrame with columns ``w_team_id``, ``l_team_id``,
``w_score``, ``l_score`` (regular-season games only).
Returns:
DataFrame with columns ``["team_id", "srs_rating"]`` (zero-centered).
"""
if games_df.empty:
return pd.DataFrame(columns=["team_id", "srs_rating"])
teams, _, w_idx, l_idx = _build_team_index(games_df)
n = len(teams)
raw_margins = (games_df["w_score"] - games_df["l_score"]).to_numpy(dtype=float)
margins = np.minimum(raw_margins, float(self._margin_cap))
_, _, avg_margin, A_norm = _build_srs_matrices(n, w_idx, l_idx, margins)
# Fixed-point iteration: r = avg_margin + A_norm @ r
r: npt.NDArray[np.float64] = np.zeros(n)
for _ in range(self._srs_max_iter):
r_new = avg_margin + A_norm @ r
if float(np.max(np.abs(r_new - r))) < _SRS_CONVERGENCE_TOL:
r = r_new
break
r = r_new
else:
logger.warning(
"SRS did not converge after %d iterations; ratings may be inaccurate",
self._srs_max_iter,
)
# Zero-center ratings (enforces unique solution)
r -= float(np.mean(r))
return pd.DataFrame({"team_id": teams, "srs_rating": r.tolist()})
[docs]
def compute_ridge(self, games_df: pd.DataFrame) -> pd.DataFrame:
"""Compute Ridge regression ratings (regularized SRS).
Args:
games_df: DataFrame with columns ``w_team_id``, ``l_team_id``,
``w_score``, ``l_score`` (regular-season games only).
Returns:
DataFrame with columns ``["team_id", "ridge_rating"]``.
"""
if games_df.empty:
return pd.DataFrame(columns=["team_id", "ridge_rating"])
teams, _, w_idx, l_idx = _build_team_index(games_df)
n = len(teams)
n_games = len(games_df)
X: npt.NDArray[np.float64] = np.zeros((n_games, n))
raw_margins = (games_df["w_score"] - games_df["l_score"]).to_numpy(dtype=float)
y = np.minimum(raw_margins, float(self._margin_cap))
# Build design matrix (vectorized scatter)
game_indices = np.arange(n_games)
X[game_indices, w_idx] = 1.0
X[game_indices, l_idx] = -1.0
model = Ridge(alpha=self._ridge_lambda, fit_intercept=False)
model.fit(X, y)
ratings: list[float] = model.coef_.tolist()
return pd.DataFrame({"team_id": teams, "ridge_rating": ratings})
[docs]
def compute_colley(self, games_df: pd.DataFrame) -> pd.DataFrame:
"""Compute Colley Matrix ratings (win/loss only, no margin).
Args:
games_df: DataFrame with columns ``w_team_id``, ``l_team_id``
(regular-season games only; scores not used).
Returns:
DataFrame with columns ``["team_id", "colley_rating"]`` (bounded [0, 1]).
"""
if games_df.empty:
return pd.DataFrame(columns=["team_id", "colley_rating"])
teams, _, w_idx, l_idx = _build_team_index(games_df)
n = len(teams)
# Build C and b (vectorized)
C: npt.NDArray[np.float64] = np.zeros((n, n))
np.add.at(C, (w_idx, w_idx), 1.0) # diagonal: games played
np.add.at(C, (l_idx, l_idx), 1.0)
np.add.at(C, (w_idx, l_idx), -1.0) # off-diagonal: games between pair
np.add.at(C, (l_idx, w_idx), -1.0)
C += 2.0 * np.eye(n) # add 2 to diagonal per Colley formulation
wins: npt.NDArray[np.float64] = np.zeros(n)
losses: npt.NDArray[np.float64] = np.zeros(n)
np.add.at(wins, w_idx, 1.0)
np.add.at(losses, l_idx, 1.0)
b = 1.0 + (wins - losses) / 2.0
# Solve C r = b
try:
r: npt.NDArray[np.float64] = np.linalg.solve(C, b) # type: ignore[assignment]
except np.linalg.LinAlgError:
# Singular matrix (disconnected schedule) — fall back to lstsq
logger.warning("Colley matrix is singular (disconnected schedule); using lstsq fallback")
r, _, _, _ = np.linalg.lstsq(C, b, rcond=None) # type: ignore[assignment]
return pd.DataFrame({"team_id": teams, "colley_rating": r.tolist()})
def _build_team_index(
games_df: pd.DataFrame,
) -> tuple[
list[int],
dict[int, int],
npt.NDArray[np.intp],
npt.NDArray[np.intp],
]:
"""Build sorted team list, index mapping, and vectorized index arrays.
Extracts unique team IDs from winner and loser columns, sorts them,
builds a bidirectional index dict, then uses .map() to vectorize
team-ID-to-index lookups into NumPy arrays.
"""
teams: list[int] = sorted(set(games_df["w_team_id"].tolist()) | set(games_df["l_team_id"].tolist()))
idx: dict[int, int] = {t: i for i, t in enumerate(teams)}
w_idx: npt.NDArray[np.intp] = games_df["w_team_id"].map(idx).to_numpy(dtype=np.intp)
l_idx: npt.NDArray[np.intp] = games_df["l_team_id"].map(idx).to_numpy(dtype=np.intp)
return teams, idx, w_idx, l_idx
def _build_srs_matrices(
n: int,
w_idx: npt.NDArray[np.intp],
l_idx: npt.NDArray[np.intp],
margins: npt.NDArray[np.float64],
) -> tuple[
npt.NDArray[np.float64],
npt.NDArray[np.int64],
npt.NDArray[np.float64],
npt.NDArray[np.float64],
]:
"""Build net_margin, n_games, avg_margin, and normalized adjacency matrix for SRS.
Uses np.add.at() to vectorize accumulation of net margins and game
counts per team, computes average margin with safe division, builds an
opponent co-occurrence matrix, then row-normalizes by each team's game
count.
"""
net_margin: npt.NDArray[np.float64] = np.zeros(n)
n_games: npt.NDArray[np.int64] = np.zeros(n, dtype=np.int64)
np.add.at(net_margin, w_idx, margins)
np.add.at(net_margin, l_idx, -margins)
np.add.at(n_games, w_idx, 1)
np.add.at(n_games, l_idx, 1)
# Avoid division by zero (isolated teams with 0 games → rating stays 0)
n_safe = np.where(n_games > 0, n_games, 1)
avg_margin = net_margin / n_safe
# Build opponent adjacency matrix A[i,j] = games between i and j
A: npt.NDArray[np.float64] = np.zeros((n, n))
np.add.at(A, (w_idx, l_idx), 1.0)
np.add.at(A, (l_idx, w_idx), 1.0)
# Normalize: A_norm[i,j] = fraction of team i's games against j
A_norm = A / n_safe[:, np.newaxis]
return net_margin, n_games, avg_margin, A_norm
# ---------------------------------------------------------------------------
# Module-level convenience functions (primary public API)
# ---------------------------------------------------------------------------
[docs]
def compute_srs_ratings(
games_df: pd.DataFrame,
*,
margin_cap: int = DEFAULT_MARGIN_CAP,
max_iter: int = DEFAULT_SRS_MAX_ITER,
) -> pd.DataFrame:
"""Compute SRS ratings using default solver config.
Args:
games_df: DataFrame with columns ``w_team_id``, ``l_team_id``,
``w_score``, ``l_score`` (regular-season games only).
margin_cap: Maximum point margin cap per game (default 25).
max_iter: Maximum SRS iterations (default 10,000).
Returns:
DataFrame with columns ``["team_id", "srs_rating"]``.
"""
return BatchRatingSolver(
margin_cap=margin_cap,
srs_max_iter=max_iter,
).compute_srs(games_df)
[docs]
def compute_ridge_ratings(
games_df: pd.DataFrame,
*,
lam: float = DEFAULT_RIDGE_LAMBDA,
margin_cap: int = DEFAULT_MARGIN_CAP,
) -> pd.DataFrame:
"""Compute Ridge regression ratings.
Args:
games_df: DataFrame with columns ``w_team_id``, ``l_team_id``,
``w_score``, ``l_score`` (regular-season games only).
lam: Ridge regularization parameter λ (default 20.0).
margin_cap: Maximum point margin cap per game (default 25).
Returns:
DataFrame with columns ``["team_id", "ridge_rating"]``.
"""
return BatchRatingSolver(
margin_cap=margin_cap,
ridge_lambda=lam,
).compute_ridge(games_df)
[docs]
def compute_colley_ratings(games_df: pd.DataFrame) -> pd.DataFrame:
"""Compute Colley Matrix win/loss-only ratings.
Args:
games_df: DataFrame with columns ``w_team_id``, ``l_team_id``
(regular-season games only; scores not used).
Returns:
DataFrame with columns ``["team_id", "colley_rating"]``.
"""
return BatchRatingSolver().compute_colley(games_df)