Source code for ncaa_eval.transform.graph
"""Graph-based centrality feature engineering for NCAA team schedules.
This module builds directed NetworkX graphs from season game results and computes
centrality-based features (PageRank, betweenness, HITS, clustering coefficient).
Graph semantics:
- Edges are directed loser → winner ("loser votes for winner quality", PageRank metaphor).
- Edge weight = min(margin, margin_cap) × optional recency multiplier.
- High PageRank ≡ transitively strong team (many high-quality wins flow toward you).
Architecture constraints:
- Pure transform-layer component: accepts pre-loaded DataFrames, does NOT load CSV files.
- No imports from ncaa_eval.ingest (Repository coupling is forbidden here).
- Caller is responsible for deduplicating games by (w_team_id, l_team_id, day_num)
before calling graph functions for any season with ESPN+Kaggle overlap.
Walk-forward usage (Story 4.7):
- Start with an empty DiGraph at the beginning of each season.
- Add games one-by-one in chronological order via add_game_to_graph().
- Call compute_features(graph, pagerank_init=prev_pagerank) after each game.
- Store pagerank_init (result of previous compute_pagerank()) for warm-start efficiency.
"""
from __future__ import annotations
import logging
import networkx as nx # type: ignore[import-untyped]
import pandas as pd # type: ignore[import-untyped]
from ncaa_eval.transform.constants import DEFAULT_MARGIN_CAP
logger = logging.getLogger(__name__)
# Module-level constants
DEFAULT_RECENCY_WINDOW_DAYS: int = 20
DEFAULT_RECENCY_MULTIPLIER: float = 1.5
PAGERANK_ALPHA: float = 0.85
_FEATURE_COLUMNS: list[str] = [
"team_id",
"pagerank",
"betweenness_centrality",
"hits_hub",
"hits_authority",
"clustering_coefficient",
]
[docs]
def build_season_graph(
games_df: pd.DataFrame,
margin_cap: int = DEFAULT_MARGIN_CAP,
reference_day_num: int | None = None,
recency_window_days: int = DEFAULT_RECENCY_WINDOW_DAYS,
recency_multiplier: float = DEFAULT_RECENCY_MULTIPLIER,
) -> nx.DiGraph:
"""Build a directed graph from season game results.
Edges: loser → winner (loser 'votes for' winner quality, PageRank metaphor).
Edge weight: min(margin, margin_cap) × optional_recency_multiplier.
Parallel edges (same team pair playing multiple times) are aggregated by summing
their weights before passing to nx.from_pandas_edgelist().
Args:
games_df: DataFrame with columns w_team_id, l_team_id, w_score, l_score, day_num.
margin_cap: Maximum margin-of-victory to use as edge weight (prevents blowout distortion).
reference_day_num: Day number used to compute recency window. Defaults to max day_num
in games_df if None.
recency_window_days: Games within this many days of reference_day_num get recency boost.
recency_multiplier: Weight multiplier for recent games within recency_window_days.
Returns:
nx.DiGraph with edges loser→winner and "weight" attribute on each edge.
Returns empty DiGraph if games_df is empty.
"""
if games_df.empty:
return nx.DiGraph()
ref_day = int(games_df["day_num"].max()) if reference_day_num is None else reference_day_num
# Step 1: Compute base weight (vectorized)
df = games_df.assign(
weight=(games_df["w_score"] - games_df["l_score"]).clip(upper=margin_cap).astype(float)
)
# Step 2: Apply recency multiplier (vectorized)
within_window = df["day_num"] >= (ref_day - recency_window_days)
df = df.assign(weight=df["weight"].where(~within_window, df["weight"] * recency_multiplier))
# Step 3: Aggregate parallel edges (same team pair played multiple times)
edge_df = df.groupby(["l_team_id", "w_team_id"], as_index=False)["weight"].sum()
# Step 4: Build directed graph — loser → winner
G: nx.DiGraph = nx.from_pandas_edgelist(
edge_df,
source="l_team_id",
target="w_team_id",
edge_attr="weight",
create_using=nx.DiGraph,
)
return G
[docs]
def compute_pagerank(
G: nx.DiGraph,
alpha: float = PAGERANK_ALPHA,
nstart: dict[int, float] | None = None,
) -> dict[int, float]:
"""Compute PageRank for each team in the graph.
Captures transitive win-chain strength (2 hops vs. SoS 1 hop).
Peer-reviewed NCAA validation: 71.6% vs. 64.2% naive win-ratio (Matthews et al. 2021).
Args:
G: Directed graph with loser→winner edges and "weight" attribute.
alpha: Damping factor (teleportation probability = 1 - alpha).
nstart: Optional warm-start dictionary (team_id → initial probability).
Initialize with previous solution for 2–5 iterations instead of 30–50.
Returns:
dict mapping team_id → PageRank score. Empty dict if graph has no nodes.
"""
if G.number_of_nodes() == 0:
return {}
return nx.pagerank(G, alpha=alpha, weight="weight", nstart=nstart) # type: ignore[no-any-return]
[docs]
def compute_betweenness_centrality(G: nx.DiGraph) -> dict[int, float]:
"""Compute betweenness centrality for each team in the graph.
Captures structural "bridge" position — distinct signal from PageRank (strength)
and SoS (schedule quality).
Args:
G: Directed graph.
Returns:
dict mapping team_id → betweenness centrality score. Empty dict if graph has no nodes.
"""
if G.number_of_nodes() == 0:
return {}
return nx.betweenness_centrality(G, normalized=True) # type: ignore[no-any-return]
[docs]
def compute_hits(
G: nx.DiGraph,
max_iter: int = 100,
) -> tuple[dict[int, float], dict[int, float]]:
"""Compute HITS hub and authority scores for each team.
Authority ≈ PageRank (r≈0.908 correlation). Hub = "quality schedule despite losses"
— distinct signal. Both are returned from a single nx.hits() call.
Args:
G: Directed graph.
max_iter: Maximum iterations for HITS power iteration.
Returns:
Tuple of (hub_dict, authority_dict), each mapping team_id → score.
Returns uniform 0.0 scores for all nodes if graph has no edges.
Returns uniform 1/n scores on convergence failure (with warning logged).
"""
if G.number_of_edges() == 0:
# nx.hits raises ZeroDivisionError or returns zeros for no-edge graphs
return {n: 0.0 for n in G.nodes()}, {n: 0.0 for n in G.nodes()}
try:
hub, auth = nx.hits(G, max_iter=max_iter)
except nx.PowerIterationFailedConvergence:
logger.warning("HITS failed to converge after %d iterations; returning uniform scores", max_iter)
n = G.number_of_nodes()
score = 1.0 / n if n > 0 else 0.0
hub_uniform: dict[int, float] = dict.fromkeys(G.nodes(), score)
auth_uniform: dict[int, float] = dict.fromkeys(G.nodes(), score)
return hub_uniform, auth_uniform
return hub, auth
[docs]
def compute_clustering_coefficient(G: nx.DiGraph) -> dict[int, float]:
"""Compute undirected clustering coefficient for each team.
Schedule diversity metric: low clustering = broad cross-conference scheduling.
Uses undirected conversion so that mutual matchups count once (natural interpretation).
Args:
G: Directed graph (converted to undirected internally).
Returns:
dict mapping team_id → clustering coefficient. Empty dict if graph has no nodes.
"""
if G.number_of_nodes() == 0:
return {}
return nx.clustering(G.to_undirected()) # type: ignore[no-any-return]
[docs]
class GraphTransformer:
"""Transform game DataFrames into graph-based centrality features.
Provides both batch (build + compute in one call) and incremental (add_game_to_graph)
update strategies for walk-forward backtesting efficiency.
Typical walk-forward usage (Story 4.7):
transformer = GraphTransformer()
graph = nx.DiGraph()
prev_pagerank: dict[int, float] | None = None
for game in chronological_games:
transformer.add_game_to_graph(graph, ...)
features_df = transformer.compute_features(graph, pagerank_init=prev_pagerank)
prev_pagerank = dict(zip(features_df["team_id"], features_df["pagerank"]))
"""
def __init__(
self,
margin_cap: int = DEFAULT_MARGIN_CAP,
recency_window_days: int = DEFAULT_RECENCY_WINDOW_DAYS,
recency_multiplier: float = DEFAULT_RECENCY_MULTIPLIER,
) -> None:
self._margin_cap = margin_cap
self._recency_window_days = recency_window_days
self._recency_multiplier = recency_multiplier
[docs]
def build_graph(
self,
games_df: pd.DataFrame,
reference_day_num: int | None = None,
) -> nx.DiGraph:
"""Build a season graph from a games DataFrame.
Args:
games_df: DataFrame with columns w_team_id, l_team_id, w_score, l_score, day_num.
reference_day_num: Reference day for recency weighting. Defaults to max day_num.
Returns:
nx.DiGraph with loser→winner edges and "weight" attribute.
"""
return build_season_graph(
games_df,
margin_cap=self._margin_cap,
reference_day_num=reference_day_num,
recency_window_days=self._recency_window_days,
recency_multiplier=self._recency_multiplier,
)
[docs]
def add_game_to_graph( # noqa: PLR0913 — graph construction has inherent dimensionality
self,
graph: nx.DiGraph,
w_team_id: int,
l_team_id: int,
margin: int,
day_num: int,
reference_day_num: int,
) -> None:
"""Add a single game to an existing graph in-place.
Supports incremental walk-forward updates without rebuilding the full graph.
Edge direction: l_team_id → w_team_id (loser votes for winner).
Args:
graph: Existing nx.DiGraph to update in-place.
w_team_id: Winner team ID.
l_team_id: Loser team ID.
margin: Margin of victory (absolute score difference).
day_num: Day number of the game.
reference_day_num: Reference day for recency window evaluation.
"""
weight = float(min(margin, self._margin_cap))
if day_num >= (reference_day_num - self._recency_window_days):
weight *= self._recency_multiplier
# Ensure both nodes are in the graph (isolated nodes may not appear in edge list)
if w_team_id not in graph:
graph.add_node(w_team_id)
if l_team_id not in graph:
graph.add_node(l_team_id)
# Accumulate weight for repeated matchups
if graph.has_edge(l_team_id, w_team_id):
graph[l_team_id][w_team_id]["weight"] += weight
else:
graph.add_edge(l_team_id, w_team_id, weight=weight)
[docs]
def compute_features(
self,
graph: nx.DiGraph,
pagerank_init: dict[int, float] | None = None,
) -> pd.DataFrame:
"""Compute all centrality features for every team node in the graph.
Args:
graph: nx.DiGraph with loser→winner edges.
pagerank_init: Optional warm-start dict (team_id → probability) from a previous
compute_pagerank() call. Reduces PageRank iterations from ~30–50 to ~2–5.
Returns:
pd.DataFrame with columns ["team_id", "pagerank", "betweenness_centrality",
"hits_hub", "hits_authority", "clustering_coefficient"], one row per team node.
Returns empty DataFrame with correct columns if graph has no nodes.
"""
if graph.number_of_nodes() == 0:
return pd.DataFrame(columns=_FEATURE_COLUMNS)
pr = compute_pagerank(graph, nstart=pagerank_init)
bc = compute_betweenness_centrality(graph)
hub, auth = compute_hits(graph)
cc = compute_clustering_coefficient(graph)
team_ids = sorted(graph.nodes())
return pd.DataFrame(
{
"team_id": team_ids,
"pagerank": [pr.get(t, 0.0) for t in team_ids],
"betweenness_centrality": [bc.get(t, 0.0) for t in team_ids],
"hits_hub": [hub.get(t, 0.0) for t in team_ids],
"hits_authority": [auth.get(t, 0.0) for t in team_ids],
"clustering_coefficient": [cc.get(t, 0.0) for t in team_ids],
}
)
[docs]
def transform(
self,
games_df: pd.DataFrame,
reference_day_num: int | None = None,
pagerank_init: dict[int, float] | None = None,
) -> pd.DataFrame:
"""Convenience method: build graph then compute all centrality features.
Args:
games_df: DataFrame with columns w_team_id, l_team_id, w_score, l_score, day_num.
reference_day_num: Reference day for recency weighting. Defaults to max day_num.
pagerank_init: Optional warm-start dict for PageRank.
Returns:
pd.DataFrame with centrality features, one row per team.
Returns empty DataFrame with correct columns if games_df is empty.
"""
if games_df.empty:
return pd.DataFrame(columns=_FEATURE_COLUMNS)
graph = self.build_graph(games_df, reference_day_num=reference_day_num)
return self.compute_features(graph, pagerank_init=pagerank_init)