Source code for ncaa_eval.ingest.sync

"""Sync engine for fetching NCAA data and persisting it with smart caching.

`SyncEngine` orchestrates data retrieval from configured connectors (Kaggle,
ESPN) and stores results via a `Repository`. Parquet-level caching prevents
redundant fetches on subsequent runs.
"""

from __future__ import annotations

import dataclasses
import logging
from pathlib import Path

import pandas as pd  # type: ignore[import-untyped]

from ncaa_eval.ingest.connectors.espn import EspnConnector
from ncaa_eval.ingest.connectors.kaggle import KaggleConnector
from ncaa_eval.ingest.fuzzy import fuzzy_match_team
from ncaa_eval.ingest.repository import Repository

logger = logging.getLogger(__name__)

# ESPN location names that aren't covered by MTeamSpellings.csv.  These are
# persistent ESPN abbreviations/nicknames that no generic fuzzy algorithm
# can safely resolve without false positives (e.g. "App State" would match
# "Iowa State" at 88 via partial_ratio; "minnesota" would match at 100 for
# "St. Thomas-Minnesota").  Add entries here when the sync log reports
# unmatched ESPN locations.
_ESPN_LOCATION_OVERRIDES: dict[str, int] = {
    "App State": 1111,  # ESPN abbrev.; Kaggle: "Appalachian St"
    "UL Monroe": 1419,  # ESPN abbrev.; Kaggle: "ULM"
    "St. Thomas-Minnesota": 1472,  # ESPN hyphenated; Kaggle: "St Thomas MN"
}


def _build_espn_team_map(year: int, spellings: dict[str, int]) -> dict[str, int]:
    """Build ESPN location-name → Kaggle TeamID mapping via cbbpy's bundled team map.

    cbbpy ships a `mens_team_map.csv` that lists every D-I team per season
    with the `location` name ESPN uses internally (e.g. `"UC Santa Barbara"`,
    `"Florida Gulf Coast"`).  Using these location names as the keys in
    `team_name_to_id` means:

    * `_fetch_per_team` queries cbbpy with *exact* ESPN names → no wrong
      fuzzy match (avoids `"california-santa-barbara"` → `"California"`).
    * The schedule DataFrame's `team`/`opponent` columns also use these
      location names → `_resolve_team_id` can do direct dict lookups.

    Each ESPN location is resolved to a Kaggle ID by exact lookup in the
    Kaggle spellings dict (lowercased).  A token-set-ratio fuzzy fallback
    handles any locations not covered by the spellings.

    Falls back to the latest available season in the map if *year* is absent
    (e.g. cbbpy hasn't published the current season's map yet).
    """
    import cbbpy  # type: ignore[import-untyped]  # local import; no stubs

    map_path = Path(cbbpy.__file__).parent / "utils" / "mens_team_map.csv"
    df: pd.DataFrame = pd.read_csv(map_path)

    available: set[int] = set(int(s) for s in df["season"].unique())
    if year not in available:
        fallback = max(available)
        logger.info("espn: season %d not in cbbpy team map; using %d", year, fallback)
        year = fallback

    season_df: pd.DataFrame = df[df["season"] == year]
    locations: list[str] = season_df["location"].astype(str).tolist()
    result: dict[str, int] = {}
    unmatched: list[str] = []

    for location in locations:
        # Explicit overrides take priority (handles ESPN abbreviations that
        # can't be safely resolved by generic fuzzy matching).
        if location in _ESPN_LOCATION_OVERRIDES:
            result[location] = _ESPN_LOCATION_OVERRIDES[location]
            continue
        kaggle_id: int | None = spellings.get(location.lower())
        if kaggle_id is not None:
            result[location] = kaggle_id
            continue
        # Fuzzy fallback for locations not covered by the spellings file.
        fuzzy_id = fuzzy_match_team(location, spellings)
        if fuzzy_id is not None:
            result[location] = fuzzy_id
        else:
            unmatched.append(location)

    if unmatched:
        logger.warning(
            "espn: %d ESPN locations could not be matched to a Kaggle team: %s%s",
            len(unmatched),
            unmatched[:5],
            " ..." if len(unmatched) > 5 else "",
        )

    return result


[docs] @dataclasses.dataclass class SyncResult: """Summary of a single source sync operation.""" source: str teams_written: int = 0 seasons_written: int = 0 games_written: int = 0 seasons_cached: int = 0
[docs] class SyncEngine: """Orchestrates data sync from external sources into the local repository. Args: repository: Repository instance used for reading and writing data. data_dir: Root directory for local Parquet files and cached CSVs. """ def __init__(self, repository: Repository, data_dir: Path) -> None: self._repo = repository self._data_dir = data_dir def _espn_marker(self, year: int) -> Path: """Return the path of the ESPN sync marker file for *year*.""" return self._data_dir / f".espn_synced_{year}"
[docs] def sync_kaggle(self, force_refresh: bool = False) -> SyncResult: """Sync NCAA data from Kaggle with Parquet-level caching. Downloads CSVs (if not cached) and converts them to Parquet. Skips individual entities whose Parquet files already exist, unless *force_refresh* is ``True``. Args: force_refresh: Bypass all caches and re-fetch everything. Returns: SyncResult summarising teams/seasons/games written and cached. """ result = SyncResult(source="kaggle") connector = KaggleConnector(extract_dir=self._data_dir / "kaggle") connector.download(force=force_refresh) # CSV-level cache # Teams: Parquet-level cache teams_path = self._data_dir / "teams.parquet" if force_refresh or not teams_path.exists(): teams = connector.fetch_teams() self._repo.save_teams(teams) result.teams_written = len(teams) logger.info("[kaggle] teams: %d written", len(teams)) else: logger.info("[kaggle] teams: cache hit, skipped") # Seasons: Parquet-level cache with new-season detection. # After a Kaggle download, the CSV may contain seasons not yet in # the cached parquet (e.g. 2026 added after a previous 2025 sync). # Always compare CSV seasons against the cache to avoid silently # missing new data. seasons_path = self._data_dir / "seasons.parquet" if force_refresh or not seasons_path.exists(): seasons = connector.fetch_seasons() self._repo.save_seasons(seasons) result.seasons_written = len(seasons) logger.info("[kaggle] seasons: %d written", len(seasons)) else: cached_seasons = self._repo.get_seasons() csv_seasons = connector.fetch_seasons() cached_years = {s.year for s in cached_seasons} csv_years = {s.year for s in csv_seasons} new_years = csv_years - cached_years if new_years: logger.info( "[kaggle] seasons: new seasons detected in CSV: %s — updating cache", sorted(new_years), ) self._repo.save_seasons(csv_seasons) seasons = csv_seasons result.seasons_written = len(new_years) else: seasons = cached_seasons logger.info("[kaggle] seasons: cache hit, skipped") # Games: per-season Parquet-level cache for season in seasons: game_path = self._data_dir / "games" / f"season={season.year}" / "data.parquet" if not force_refresh and game_path.exists(): result.seasons_cached += 1 logger.info("[kaggle] season %d: cache hit, skipped", season.year) continue games = connector.fetch_games(season.year) self._repo.save_games(games) result.games_written += len(games) logger.info("[kaggle] season %d: %d games written", season.year, len(games)) return result
[docs] def sync_espn(self, force_refresh: bool = False) -> SyncResult: """Sync the most recent season's games from ESPN. Requires Kaggle data to be synced first (needs team and season mappings). Uses a marker-file cache: if ``.espn_synced_{year}`` exists the season is considered up-to-date unless *force_refresh*. ESPN games are merged with existing Kaggle games for the same season partition before saving (because ``save_games`` overwrites). Args: force_refresh: Bypass marker-file cache and re-fetch from ESPN. Returns: SyncResult summarising games written and seasons cached. Raises: RuntimeError: Kaggle data has not been synced yet. """ result = SyncResult(source="espn") teams = self._repo.get_teams() seasons = self._repo.get_seasons() if not teams or not seasons: raise RuntimeError( "ESPN sync requires Kaggle data to be synced first. " "Run: python sync.py --source kaggle --dest <path>" ) # Load DayZero mapping and alternate spellings from already-downloaded Kaggle CSVs. kaggle_connector = KaggleConnector(extract_dir=self._data_dir / "kaggle") season_day_zeros = kaggle_connector.load_day_zeros() spellings = kaggle_connector.fetch_team_spellings() # ESPN scope: most recent season only year = max(s.year for s in seasons) # Build ESPN location → Kaggle ID mapping using cbbpy's authoritative # team list. This ensures _fetch_per_team passes exact ESPN location # names to cbbpy (no wrong internal fuzzy matches). team_name_to_id = _build_espn_team_map(year, spellings) # Cache check via marker file marker = self._espn_marker(year) if not force_refresh and marker.exists(): result.seasons_cached += 1 logger.info("[espn] season %d: cache hit, skipped", year) return result if force_refresh and marker.exists(): marker.unlink() connector = EspnConnector( team_name_to_id=team_name_to_id, season_day_zeros=season_day_zeros, ) espn_games = connector.fetch_games(year) # Merge with existing Kaggle games: save_games() overwrites the partition. existing_games = self._repo.get_games(year) all_games = existing_games + espn_games self._repo.save_games(all_games) result.games_written = len(espn_games) logger.info("[espn] season %d: %d games written", year, len(espn_games)) # Mark season as synced self._data_dir.mkdir(parents=True, exist_ok=True) marker.touch() return result
[docs] def sync_all(self, force_refresh: bool = False) -> list[SyncResult]: """Sync all configured sources: Kaggle first, then ESPN. Args: force_refresh: Bypass caches for all sources. Returns: List of SyncResult, one per source (kaggle, espn). """ kaggle_result = self.sync_kaggle(force_refresh) espn_result = self.sync_espn(force_refresh) return [kaggle_result, espn_result]