"""Game-by-game Elo rating engine for NCAA basketball feature engineering.
Computes Elo ratings as a **feature building block** — the resulting per-team
ratings feed into models (XGBoost, etc.) as input features. This module does
NOT implement model-level ``train``/``predict``/``save`` interfaces; those
belong in Story 5.3.
Key design points:
* ``update_game()`` returns the **before** ratings, then mutates internal
state, guaranteeing walk-forward temporal safety.
* Variable K-factor: early-season → regular-season → tournament.
* Margin-of-victory scaling with diminishing returns (Silver/SBCB formula).
* Home-court adjustment subtracted from effective rating before computing
expected outcome.
* Season mean-reversion toward conference mean (or global mean as fallback).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING
import pandas as pd # type: ignore[import-untyped]
from ncaa_eval.transform.serving import rescale_overtime
if TYPE_CHECKING:
from ncaa_eval.ingest.schema import Game
from ncaa_eval.transform.normalization import ConferenceLookup
logger = logging.getLogger(__name__)
# ── Configuration ────────────────────────────────────────────────────────────
[docs]
@dataclass(frozen=True)
class EloConfig:
"""Frozen configuration for the Elo feature engine.
All K-factor, margin scaling, home-court, and mean-reversion parameters
are configurable with sensible defaults matching the Silver/SBCB model.
"""
initial_rating: float = 1500.0
k_early: float = 56.0
k_regular: float = 38.0
k_tournament: float = 47.5
early_game_threshold: int = 20
margin_exponent: float = 0.85
max_margin: int = 25
home_advantage_elo: float = 3.5
mean_reversion_fraction: float = 0.25
# ── Engine ───────────────────────────────────────────────────────────────────
[docs]
class EloFeatureEngine:
"""Game-by-game Elo rating engine.
Args:
config: Frozen Elo configuration.
conference_lookup: Optional conference lookup for season
mean-reversion. When ``None``, mean-reversion falls back to
global mean.
"""
def __init__(
self,
config: EloConfig,
conference_lookup: ConferenceLookup | None = None,
) -> None:
self._config = config
self._conference_lookup = conference_lookup
self._ratings: dict[int, float] = {}
self._game_counts: dict[int, int] = {}
# ── Public: core Elo math ────────────────────────────────────────────
[docs]
@staticmethod
def expected_score(rating_a: float, rating_b: float) -> float:
"""Logistic expected score for team A against team B.
``expected = 1 / (1 + 10^((r_b − r_a) / 400))``
"""
exponent = (rating_b - rating_a) / 400.0
return 1.0 / (1.0 + float(10.0**exponent))
[docs]
def get_rating(self, team_id: int) -> float:
"""Return current Elo rating for *team_id* (initial_rating if unseen)."""
return self._ratings.get(team_id, self._config.initial_rating)
[docs]
def update_game( # noqa: PLR0913 — game data has inherent dimensionality
self,
w_team_id: int,
l_team_id: int,
w_score: int,
l_score: int,
loc: str,
is_tournament: bool,
*,
num_ot: int = 0,
) -> tuple[float, float]:
"""Process one game and update ratings.
Snapshots before-ratings for feature use, applies home-court
effective-rating adjustment to expected-score computation, computes
the margin-of-victory multiplier and variable K-factor, then mutates
internal rating state for both teams.
Args:
w_team_id: Winner team ID.
l_team_id: Loser team ID.
w_score: Winner final score (raw).
l_score: Loser final score (raw).
loc: ``"H"`` (winner home), ``"A"`` (winner away), ``"N"``
(neutral).
is_tournament: Whether this is a tournament game.
num_ot: Number of overtime periods (used for margin rescaling).
Returns:
Tuple of ``(elo_w_before, elo_l_before)`` — the winner's and
loser's ratings *before* this game's update, suitable for use
as walk-forward feature values.
"""
r_w = self.get_rating(w_team_id)
r_l = self.get_rating(l_team_id)
# Snapshot before-ratings for feature use
elo_w_before = r_w
elo_l_before = r_l
# Apply home-court adjustment to effective ratings for expected calc
eff_r_w = r_w
eff_r_l = r_l
if loc == "H":
# Winner is home — deflate their effective rating
eff_r_w -= self._config.home_advantage_elo
elif loc == "A":
# Winner is away → loser is home — deflate loser's effective rating
eff_r_l -= self._config.home_advantage_elo
expected_w = self.expected_score(eff_r_w, eff_r_l)
expected_l = 1.0 - expected_w
# Rescale OT scores for margin calculation
adj_w = rescale_overtime(w_score, num_ot)
adj_l = rescale_overtime(l_score, num_ot)
margin = int(round(adj_w - adj_l))
mult = self._margin_multiplier(margin)
k_w = self._effective_k(w_team_id, is_tournament)
k_l = self._effective_k(l_team_id, is_tournament)
k_eff_w = k_w * mult
k_eff_l = k_l * mult
# actual: 1.0 for winner, 0.0 for loser
self._ratings[w_team_id] = r_w + k_eff_w * (1.0 - expected_w)
self._ratings[l_team_id] = r_l + k_eff_l * (0.0 - expected_l)
# Increment game counts
self._game_counts[w_team_id] = self._game_counts.get(w_team_id, 0) + 1
self._game_counts[l_team_id] = self._game_counts.get(l_team_id, 0) + 1
return (elo_w_before, elo_l_before)
# ── Public: season management ────────────────────────────────────────
[docs]
def apply_season_mean_reversion(self, season: int) -> None:
"""Regress each team toward its conference mean (or global mean).
Groups all rated teams by conference via ``ConferenceLookup``,
computes each conference's mean rating, then shifts every team's
rating a fraction ``mean_reversion_fraction`` of the way toward its
conference mean. Teams with no conference entry fall back to the
global mean; when no ``ConferenceLookup`` is provided all teams
use the global mean. Is a no-op when no prior ratings exist.
"""
if not self._ratings:
return
fraction = self._config.mean_reversion_fraction
global_mean = sum(self._ratings.values()) / len(self._ratings)
if self._conference_lookup is None:
# Regress all toward global mean
for tid in self._ratings:
self._ratings[tid] = self._ratings[tid] + fraction * (global_mean - self._ratings[tid])
return
# Group teams by conference
conf_teams: dict[str, list[int]] = {}
no_conf: list[int] = []
for tid in self._ratings:
conf = self._conference_lookup.get(season, tid)
if conf is not None:
conf_teams.setdefault(conf, []).append(tid)
else:
no_conf.append(tid)
# Compute conference means
conf_means: dict[str, float] = {}
for conf, tids in conf_teams.items():
conf_means[conf] = sum(self._ratings[t] for t in tids) / len(tids)
# Regress toward conference mean
for conf, tids in conf_teams.items():
cm = conf_means[conf]
for tid in tids:
self._ratings[tid] = self._ratings[tid] + fraction * (cm - self._ratings[tid])
# Teams without conference info: regress toward global mean
for tid in no_conf:
self._ratings[tid] = self._ratings[tid] + fraction * (global_mean - self._ratings[tid])
[docs]
def reset_game_counts(self) -> None:
"""Reset per-team game counts for a new season (affects variable K)."""
self._game_counts.clear()
[docs]
def start_new_season(self, season: int) -> None:
"""Orchestrate season transition: mean-reversion then reset counts."""
self.apply_season_mean_reversion(season)
self.reset_game_counts()
# ── Public: snapshot / bulk ───────────────────────────────────────────
[docs]
def has_ratings(self) -> bool:
"""Return ``True`` if at least one team has a rating."""
return bool(self._ratings)
[docs]
def set_ratings(self, ratings: dict[int, float]) -> None:
"""Replace all ratings with *ratings*."""
self._ratings = dict(ratings)
[docs]
def set_game_counts(self, counts: dict[int, int]) -> None:
"""Replace all game counts with *counts*."""
self._game_counts = dict(counts)
[docs]
def get_game_counts(self) -> dict[int, int]:
"""Return a copy of the current game-counts dict."""
return dict(self._game_counts)
[docs]
def get_all_ratings(self) -> dict[int, float]:
"""Return a copy of the current ratings dict."""
return dict(self._ratings)
[docs]
def predict_matchup(self, team_a_id: int, team_b_id: int) -> float:
"""Return P(team_a wins) using the Elo expected-score formula."""
r_a = self.get_rating(team_a_id)
r_b = self.get_rating(team_b_id)
return self.expected_score(r_a, r_b)
[docs]
def process_season(self, games: list[Game], season: int) -> pd.DataFrame:
"""Process all games for a season, returning before-ratings per game.
Calls ``start_new_season(season)`` if prior ratings exist (i.e., this
is not the very first season).
Args:
games: Games sorted in chronological order.
season: Season year.
Returns:
DataFrame with columns ``[game_id, elo_w_before, elo_l_before]``.
"""
if not games:
return pd.DataFrame(columns=["game_id", "elo_w_before", "elo_l_before"])
if self._ratings:
self.start_new_season(season)
rows: list[dict[str, object]] = []
for game in games:
elo_w, elo_l = self.update_game(
w_team_id=game.w_team_id,
l_team_id=game.l_team_id,
w_score=game.w_score,
l_score=game.l_score,
loc=game.loc,
is_tournament=game.is_tournament,
num_ot=game.num_ot,
)
rows.append(
{
"game_id": game.game_id,
"elo_w_before": elo_w,
"elo_l_before": elo_l,
}
)
return pd.DataFrame(rows)
# ── Private helpers ──────────────────────────────────────────────────
def _effective_k(self, team_id: int, is_tournament: bool) -> float:
"""Determine K-factor based on game count and tournament flag.
Checks the tournament flag first (returning k_tournament), then falls
back to the early/regular K-factor threshold based on the team's game
count from the internal counter.
"""
if is_tournament:
return self._config.k_tournament
game_count = self._game_counts.get(team_id, 0)
if game_count < self._config.early_game_threshold:
return self._config.k_early
return self._config.k_regular
def _margin_multiplier(self, margin: int) -> float:
"""Compute margin-of-victory multiplier: ``min(margin, max)^exponent``.
A floor of 1 is applied before exponentiation so that an OT-rescaled
margin that rounds to zero (near-tie) still produces a non-zero rating
update, consistent with the Silver/SBCB formula intent.
"""
capped = min(abs(margin), self._config.max_margin)
floored = max(1, capped)
return float(floored**self._config.margin_exponent)