"""Parallel cross-validation backtest orchestrator.
Provides :func:`run_backtest`, which executes walk-forward cross-validation
folds in parallel using ``joblib.Parallel``. Each fold trains an independent
deep-copied model instance, generates predictions on tournament games, and
computes evaluation metrics. Results are aggregated into a
:class:`BacktestResult` containing per-fold details and a summary DataFrame.
"""
from __future__ import annotations
import copy
import dataclasses
import logging
import math
import time
import types
from collections.abc import Callable, Mapping, Sequence
from typing import Literal
import joblib # type: ignore[import-untyped]
import numpy as np
import numpy.typing as npt
import pandas as pd # type: ignore[import-untyped]
from rich.console import Console
from rich.table import Table
from ncaa_eval.evaluation.metrics import (
MetricFn,
brier_score,
expected_calibration_error,
get_metric,
list_metrics,
log_loss,
roc_auc,
)
from ncaa_eval.evaluation.splitter import CVFold, walk_forward_splits
from ncaa_eval.model.base import Model, StatefulModel
from ncaa_eval.transform.feature_serving import StatefulFeatureServer
logger = logging.getLogger(__name__)
# Metadata columns that must be stripped before feeding stateless models.
METADATA_COLS: frozenset[str] = frozenset(
{
"game_id",
"season",
"day_num",
"date",
"team_a_id",
"team_b_id",
"is_tournament",
"loc_encoding",
"team_a_won",
"w_score",
"l_score",
"num_ot",
}
)
DEFAULT_METRICS: Mapping[
str,
Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float],
] = types.MappingProxyType(
{
"log_loss": log_loss,
"brier_score": brier_score,
"roc_auc": roc_auc,
"ece": expected_calibration_error,
}
)
[docs]
def default_metrics() -> dict[str, MetricFn]:
"""Return all registered metric functions (built-in + user-registered)."""
return {name: get_metric(name) for name in list_metrics()}
[docs]
def feature_cols(df: pd.DataFrame) -> list[str]:
"""Return feature column names (everything not in METADATA_COLS).
Args:
df: DataFrame whose columns are inspected.
Returns:
List of column names that are not metadata.
"""
return [c for c in df.columns if c not in METADATA_COLS]
def _randomize_team_assignment(df: pd.DataFrame, seed: int = 42) -> pd.DataFrame:
"""Randomly swap team_a/team_b for ~50% of rows to balance binary labels.
The feature server assigns ``team_a = winner`` for every game, so
``team_a_won`` is always ``True``. Stateless classifiers require at least
two classes in the training labels; this function randomly re-assigns which
team is "a" so roughly half the rows become ``team_a_won = False``.
Paired ``_a`` / ``_b`` feature columns are swapped; ``delta_*`` and
``seed_diff`` columns are negated; ``loc_encoding`` is negated.
Args:
df: Feature DataFrame from ``StatefulFeatureServer.serve_season_features``.
seed: RNG seed for reproducibility.
Returns:
Copy of ``df`` with team assignment randomly swapped for ~50% of rows.
"""
rng = np.random.default_rng(seed)
mask = pd.Series(rng.random(len(df)) < 0.5, index=df.index)
result = df.copy()
# Swap team IDs
tmp_id = result.loc[mask, "team_a_id"].copy()
result.loc[mask, "team_a_id"] = result.loc[mask, "team_b_id"]
result.loc[mask, "team_b_id"] = tmp_id
# Flip label
result.loc[mask, "team_a_won"] = ~result.loc[mask, "team_a_won"].astype(bool)
# Negate direction-dependent scalar columns
for col in result.columns:
if col.startswith("delta_") or col in ("seed_diff", "loc_encoding"):
result.loc[mask, col] = -result.loc[mask, col]
# Swap paired _a / _b feature columns (e.g. srs_a ↔ srs_b)
checked: set[str] = set()
for col in list(result.columns):
if col.endswith("_a") and col not in ("team_a_id", "team_a_won"):
base = col[:-2]
col_b = f"{base}_b"
if col_b in result.columns and base not in checked:
checked.add(base)
tmp_a = result.loc[mask, col].copy()
result.loc[mask, col] = result.loc[mask, col_b]
result.loc[mask, col_b] = tmp_a
return result
[docs]
@dataclasses.dataclass(frozen=True)
class FoldResult:
"""Result of evaluating a single cross-validation fold.
Attributes:
year: The test season year for this fold.
predictions: Predicted probabilities for tournament games.
actuals: Actual binary outcomes for tournament games.
metrics: Mapping of metric name to computed value.
elapsed_seconds: Wall-clock time for the fold evaluation.
test_game_ids: Game IDs from the test fold (aligned to predictions).
test_team_a_ids: team_a IDs from the test fold.
test_team_b_ids: team_b IDs from the test fold.
"""
year: int
predictions: pd.Series
actuals: pd.Series
metrics: Mapping[str, float]
elapsed_seconds: float
test_game_ids: pd.Series = dataclasses.field(
default_factory=lambda: pd.Series(dtype=object),
)
test_team_a_ids: pd.Series = dataclasses.field(
default_factory=lambda: pd.Series(dtype="int64"),
)
test_team_b_ids: pd.Series = dataclasses.field(
default_factory=lambda: pd.Series(dtype="int64"),
)
[docs]
@dataclasses.dataclass(frozen=True)
class BacktestResult:
"""Aggregated result of a full backtest across all folds.
Attributes:
fold_results: Per-fold evaluation results, sorted by year.
summary: DataFrame with year as index, metric columns + elapsed_seconds.
elapsed_seconds: Total wall-clock time for the entire backtest.
"""
fold_results: tuple[FoldResult, ...]
summary: pd.DataFrame
elapsed_seconds: float
def _evaluate_fold(
fold: CVFold,
model: Model,
metric_fns: Mapping[
str,
Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float],
],
) -> FoldResult:
"""Train model on fold.train, predict on fold.test, compute metrics.
Args:
fold: A single CV fold with train/test DataFrames.
model: A deep-copied model instance (caller is responsible for copying).
metric_fns: Mapping of metric name to callable(y_true, y_prob) returning float.
Returns:
FoldResult with predictions, actuals, computed metrics, and timing.
"""
start = time.perf_counter()
if fold.test.empty:
elapsed = time.perf_counter() - start
return FoldResult(
year=fold.year,
predictions=pd.Series(dtype=np.float64),
actuals=pd.Series(dtype=np.float64),
metrics={name: float("nan") for name in metric_fns},
elapsed_seconds=elapsed,
test_game_ids=pd.Series(dtype=object),
test_team_a_ids=pd.Series(dtype="int64"),
test_team_b_ids=pd.Series(dtype="int64"),
)
# Randomize test-fold team assignment: feature server always assigns
# team_a = winner, so y_test would be all 1s without this, making
# roc_auc undefined. Use a distinct seed from the train randomisation.
test_data = _randomize_team_assignment(fold.test, seed=43)
y_test = test_data["team_a_won"].astype(np.float64)
is_stateful = isinstance(model, StatefulModel)
feat_cols = feature_cols(fold.train)
if is_stateful:
y_train = fold.train["team_a_won"].astype(np.float64)
model.fit(fold.train, y_train)
else:
# Balance labels: feature server assigns team_a = winner always,
# so team_a_won is always True. Randomise assignment before fitting.
train_data = _randomize_team_assignment(fold.train)
y_train = train_data["team_a_won"].astype(np.float64)
# Drop all-NaN columns so sklearn estimators that reject NaN can fit.
feat_cols = [c for c in feat_cols if not fold.train[c].isna().all()]
model.fit(train_data[feat_cols], y_train)
if is_stateful:
preds = model.predict_proba(test_data)
else:
preds = model.predict_proba(test_data[feat_cols])
y_true_np = y_test.to_numpy()
y_prob_np = preds.to_numpy().astype(np.float64)
metrics: dict[str, float] = {name: fn(y_true_np, y_prob_np) for name, fn in metric_fns.items()}
elapsed = time.perf_counter() - start
return FoldResult(
year=fold.year,
predictions=preds,
actuals=y_test,
metrics=metrics,
elapsed_seconds=elapsed,
test_game_ids=test_data["game_id"].reset_index(drop=True),
test_team_a_ids=test_data["team_a_id"].reset_index(drop=True),
test_team_b_ids=test_data["team_b_id"].reset_index(drop=True),
)
[docs]
def run_backtest( # noqa: PLR0913 — REFACTOR Story 8.1
model: Model,
feature_server: StatefulFeatureServer,
*,
seasons: Sequence[int],
mode: Literal["batch", "stateful"] = "batch",
n_jobs: int = -1,
metric_fns: Mapping[
str,
Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float],
]
| None = None,
console: Console | None = None,
progress: bool = False,
) -> BacktestResult:
"""Run parallelized walk-forward cross-validation backtest.
Args:
model: Model instance to evaluate (will be deep-copied per fold).
feature_server: Configured feature server for building CV folds.
seasons: Season years to include (passed to walk_forward_splits).
mode: Feature serving mode (``"batch"`` or ``"stateful"``).
n_jobs: Number of parallel workers. -1 = all cores, 1 = sequential.
metric_fns: Metric functions to compute per fold. Defaults to
{log_loss, brier_score, roc_auc, expected_calibration_error}.
console: Rich Console for progress output.
progress: Display a tqdm progress bar for fold evaluation.
Most useful with ``n_jobs=1`` (sequential execution).
Returns:
BacktestResult with per-fold results and summary DataFrame.
Raises:
ValueError: If ``mode`` is not ``"batch"`` or ``"stateful"``, or if
``seasons`` contains fewer than 2 elements (propagated from
:func:`walk_forward_splits`).
"""
# Runtime guard: Literal["batch","stateful"] enforces at static-analysis
# time; this check also protects callers who bypass mypy (e.g. YAML config).
if mode not in ("batch", "stateful"):
msg = f"mode must be 'batch' or 'stateful', got {mode!r}"
raise ValueError(msg)
resolved_metrics = default_metrics() if metric_fns is None else dict(metric_fns)
total_start = time.perf_counter()
# Materialize folds eagerly (generators can't be pickled for joblib)
folds = list(walk_forward_splits(seasons, feature_server, mode=mode))
# Deep-copy model per fold to avoid shared-state corruption
models = [copy.deepcopy(model) for _ in folds]
_console = console or Console()
_console.print(f"Running backtest: {len(folds)} folds, n_jobs={n_jobs}")
# Dispatch fold evaluation (with optional tqdm progress bar)
if progress and n_jobs != 1:
import warnings
warnings.warn(
"progress=True is only supported with n_jobs=1; progress bar skipped for parallel execution.",
UserWarning,
stacklevel=2,
)
if progress and n_jobs == 1:
from tqdm.auto import tqdm # type: ignore[import-untyped]
results: list[FoldResult] = [
_evaluate_fold(fold, m, resolved_metrics)
for fold, m in tqdm(
zip(folds, models),
total=len(folds),
desc="Backtest folds",
)
]
else:
results = joblib.Parallel(n_jobs=n_jobs)(
joblib.delayed(_evaluate_fold)(fold, m, resolved_metrics) for fold, m in zip(folds, models)
)
# Sort by year ascending (joblib may return out of order)
results.sort(key=lambda r: r.year)
total_elapsed = time.perf_counter() - total_start
# Build summary DataFrame
summary_rows: list[dict[str, object]] = []
for r in results:
row: dict[str, object] = {"year": r.year}
for metric_name in resolved_metrics:
row[metric_name] = r.metrics.get(metric_name, float("nan"))
row["elapsed_seconds"] = r.elapsed_seconds
summary_rows.append(row)
summary = pd.DataFrame(summary_rows).set_index("year")
# Progress report via Rich table
table = Table(title="Backtest Results")
table.add_column("Year", style="cyan")
for metric_name in resolved_metrics:
table.add_column(metric_name, style="green")
table.add_column("Time (s)", style="yellow")
for r in results:
row_values = [str(r.year)]
for metric_name in resolved_metrics:
val = r.metrics.get(metric_name, float("nan"))
row_values.append(f"{val:.4f}" if not math.isnan(val) else "NaN")
row_values.append(f"{r.elapsed_seconds:.2f}")
table.add_row(*row_values)
_console.print(table)
_console.print(f"Total backtest time: {total_elapsed:.2f}s")
return BacktestResult(
fold_results=tuple(results),
summary=summary,
elapsed_seconds=total_elapsed,
)