Source code for ratingmodels.relativity

r"""Rating relativities: lookup tables, one-way analysis, and GLM estimation.

Relativities (rating factors) scale a base rate up or down for a risk
characteristic (area, industry, age/sex band, plan tier, ...). They can be:

* supplied directly as a filed table (:class:`FactorTable`),
* estimated one-way as the ratio of each level's mean to the overall mean
  (:func:`one_way_relativities`), or
* estimated jointly with a generalized linear model
  (:class:`GLMRelativities`), which corrects for correlation between rating
  variables that one-way analysis cannot.

The GLM is fit by iteratively reweighted least squares (IRLS). With a log
link, :math:`\eta = X\beta + \text{offset}`, the working response and weights
are

.. math::
    z = \eta + (y - \mu)\,g'(\mu), \qquad
    w = \frac{w_{\text{prior}}}{V(\mu)\,g'(\mu)^2},

and :math:`\beta \leftarrow (X^\top W X)^{-1} X^\top W z` until convergence.
For the log link :math:`g'(\mu) = 1/\mu` and the variance function is
:math:`V(\mu) = \mu^p`: Poisson (:math:`p=1`), Gamma (:math:`p=2`), or
Tweedie (:math:`1 < p < 2`). A level's relativity is :math:`\exp(\beta)`
relative to the base (reference) level, whose relativity is 1.
"""
from __future__ import annotations

import warnings

from dataclasses import dataclass, field
from typing import Mapping, Sequence

import numpy as np
import pandas as pd


# --------------------------------------------------------------------------- #
# Filed / supplied factor tables
# --------------------------------------------------------------------------- #
[docs] @dataclass class FactorTable: """A named lookup of level -> multiplicative relativity. Parameters ---------- name : str Rating variable name (e.g. ``"area"``). factors : mapping Level -> relativity. The base level should map to 1.0 by convention. default : float Relativity returned for unknown levels. Default 1.0. """ name: str factors: Mapping default: float = 1.0 def lookup(self, level) -> float: return float(self.factors.get(level, self.default)) def apply(self, levels: Sequence) -> np.ndarray: return np.array([self.lookup(x) for x in levels], dtype=float)
[docs] def normalized(self, base_level) -> "FactorTable": """Rebase so ``base_level`` has relativity 1.0.""" base = self.lookup(base_level) if base <= 0: raise ValueError("base level relativity must be positive") return FactorTable( self.name, {k: v / base for k, v in self.factors.items()}, self.default / base, )
[docs] def one_way_relativities( data: pd.DataFrame, factor: str, response: str, exposure: str | None = None, base_level=None, ) -> pd.Series: r"""One-way relativities: each level's (exposure-weighted) mean / overall mean. Does not adjust for correlation with other rating variables; use :class:`GLMRelativities` when variables are correlated. """ if exposure is None: level_mean = data.groupby(factor)[response].mean() overall = data[response].mean() else: def _wm(g): return np.average(g[response], weights=g[exposure]) level_mean = data.groupby(factor).apply(_wm, include_groups=False) overall = np.average(data[response], weights=data[exposure]) rel = (level_mean / overall).rename("relativity") if base_level is not None: rel = rel / rel.loc[base_level] return rel
# --------------------------------------------------------------------------- # # GLM relativities via IRLS # --------------------------------------------------------------------------- # _VARIANCE_POWER = {"poisson": 1.0, "gamma": 2.0}
[docs] @dataclass class GLMRelativities: r"""GLM (log-link) relativity estimator fit by IRLS. Parameters ---------- family : {"poisson", "gamma", "tweedie"} Response distribution. ``"tweedie"`` requires ``var_power`` in (1, 2). var_power : float, optional Tweedie variance power :math:`p` in :math:`V(\mu)=\mu^p`. max_iter : int Maximum IRLS iterations. tol : float Convergence tolerance on the relative change in deviance. Attributes ---------- coefficients_ : pandas.Series Fitted :math:`\beta` including the intercept. relativities_ : dict[str, pandas.Series] Per-variable multiplicative relativities (base level = 1.0). base_value_ : float :math:`\exp(\text{intercept})`, the fitted base level. n_iter_ : int IRLS iterations used. deviance_ : float Final deviance. These attributes are populated by :meth:`fit`. """ family: str = "poisson" var_power: float | None = None max_iter: int = 100 tol: float = 1e-8 coefficients_: pd.Series = field(default=None, init=False, repr=False) relativities_: dict = field(default_factory=dict, init=False, repr=False) base_value_: float = field(default=None, init=False, repr=False) n_iter_: int = field(default=0, init=False, repr=False) deviance_: float = field(default=np.nan, init=False, repr=False) converged_: bool = field(default=False, init=False, repr=False) null_deviance_: float = field(default=np.nan, init=False, repr=False) pearson_chi2_: float = field(default=np.nan, init=False, repr=False) dispersion_: float = field(default=np.nan, init=False, repr=False) se_: pd.Series = field(default=None, init=False, repr=False) cov_params_: pd.DataFrame = field(default=None, init=False, repr=False) _design_info_: dict = field(default=None, init=False, repr=False) # ----- internals ----- # def _power(self) -> float: fam = self.family.lower() if fam in _VARIANCE_POWER: return _VARIANCE_POWER[fam] if fam == "tweedie": if self.var_power is None or not (1 < self.var_power < 2): raise ValueError("tweedie requires var_power in (1, 2)") return float(self.var_power) raise ValueError(f"unknown family {self.family!r}") def _build_design(self, data, predictors, base_levels=None, continuous=()): """One-hot encode predictors (dropping the base level); add intercept. The base (reference) level for each predictor is, in order of preference: the value supplied in ``base_levels``, otherwise the most populous level (the standard choice, giving the most stable intercept). ``continuous`` columns enter as numeric covariates unchanged. """ base_levels = dict(base_levels or {}) cols = {} chosen: dict = {} for var in predictors: cats = pd.Categorical(data[var]) levels = list(cats.categories) if not levels: raise ValueError(f"predictor {var!r} has no levels") if var in base_levels: base = base_levels[var] if base not in levels: raise ValueError( f"base level {base!r} not found among {var!r} levels" ) else: base = data[var].value_counts().idxmax() # modal level chosen[var] = base for lvl in levels: if lvl == base: continue cols[f"{var}::{lvl}"] = (cats == lvl).astype(float) for var in continuous: vals = data[var].to_numpy(dtype=float) if not np.all(np.isfinite(vals)): raise ValueError(f"continuous covariate {var!r} has non-finite values") cols[var] = vals X = pd.DataFrame(cols, index=data.index) X.insert(0, "Intercept", 1.0) return X, chosen
[docs] def fit( self, data: pd.DataFrame, response: str, predictors: Sequence[str], exposure: str | None = None, offset: str | None = None, weights: str | None = None, base_levels: Mapping[str, object] | None = None, continuous: Sequence[str] = (), ) -> "GLMRelativities": r"""Fit relativities for ``predictors`` against ``response``. ``exposure`` enters as a log offset (the natural choice for counts and pure premium). An explicit ``offset`` column (already on the log scale) and prior ``weights`` may also be supplied. ``base_levels`` maps a predictor to its reference level (relativity 1.0); unspecified predictors use their most populous level as the base. """ p = self._power() X_df, base_levels_used = self._build_design( data, list(predictors), base_levels, continuous=tuple(continuous) ) X = X_df.to_numpy(dtype=float) y = data[response].to_numpy(dtype=float) n, p_dim = X.shape eta_offset = np.zeros(n) if exposure is not None: expo = data[exposure].to_numpy(dtype=float) if np.any(expo <= 0): raise ValueError("exposure must be positive for the log offset") eta_offset = eta_offset + np.log(expo) if offset is not None: eta_offset = eta_offset + data[offset].to_numpy(dtype=float) prior_w = ( data[weights].to_numpy(dtype=float) if weights is not None else np.ones(n) ) if np.any(prior_w < 0): raise ValueError("weights must be non-negative") # initialise mu near the data, away from zero mu = np.maximum(y, np.mean(y) * 0.1 + 1e-3) eta = np.log(mu) beta = np.zeros(p_dim) dev_old = np.inf for it in range(1, self.max_iter + 1): # log link: g'(mu) = 1/mu -> z = eta + (y - mu)/mu z = eta + (y - mu) / mu - eta_offset # w = prior_w / (V(mu) * g'(mu)^2) = prior_w * mu^2 / mu^p w = prior_w * mu ** (2 - p) WX = X * w[:, None] xtwx = X.T @ WX xtwz = X.T @ (w * z) try: beta = np.linalg.solve(xtwx, xtwz) except np.linalg.LinAlgError: warnings.warn( "design matrix is rank deficient (aliased levels); " "using a least-squares solution", stacklevel=2, ) beta = np.linalg.lstsq(xtwx, xtwz, rcond=None)[0] eta = X @ beta + eta_offset eta = np.clip(eta, -30, 30) # guard against overflow mu = np.exp(eta) dev = self._deviance(y, mu, prior_w, p) if ( np.isfinite(dev) and np.isfinite(dev_old) and abs(dev - dev_old) <= self.tol * (abs(dev_old) + self.tol) ): self.n_iter_ = it self.converged_ = True break dev_old = dev else: self.n_iter_ = self.max_iter self.converged_ = False self.coefficients_ = pd.Series(beta, index=X_df.columns) self.deviance_ = float(dev_old if not np.isfinite(dev) else dev) self.base_value_ = float(np.exp(beta[0])) # ----- inference: Pearson dispersion and quasi-likelihood covariance ----- pearson = float(np.sum(prior_w * (y - mu) ** 2 / mu**p)) dof = max(n - p_dim, 1) self.pearson_chi2_ = pearson self.dispersion_ = pearson / dof try: cov = self.dispersion_ * np.linalg.inv(xtwx) self.cov_params_ = pd.DataFrame(cov, index=X_df.columns, columns=X_df.columns) self.se_ = pd.Series(np.sqrt(np.maximum(np.diag(cov), 0.0)), index=X_df.columns) except np.linalg.LinAlgError: self.cov_params_ = None self.se_ = None # ----- null deviance: intercept (+ offset) only ----- mu0 = np.full(n, max(np.sum(prior_w * y) / np.sum(prior_w), 1e-12)) if exposure is not None or offset is not None: # weighted-mean rate on the offset scale rate0 = np.sum(prior_w * y) / np.sum(prior_w * np.exp(eta_offset)) mu0 = max(rate0, 1e-12) * np.exp(eta_offset) self.null_deviance_ = self._deviance(y, mu0, prior_w, p) self._design_info_ = { "predictors": list(predictors), "base_levels": dict(base_levels_used), "continuous": list(continuous), "columns": list(X_df.columns), } # assemble relativities per variable (base level = 1.0) rels: dict[str, pd.Series] = {} for var in predictors: levels = [base_levels_used[var]] vals = [1.0] for name, coef in self.coefficients_.items(): if name.startswith(f"{var}::"): levels.append(name.split("::", 1)[1]) vals.append(float(np.exp(coef))) rels[var] = pd.Series(vals, index=levels, name=f"{var}_relativity") self.relativities_ = rels return self
@staticmethod def _deviance(y, mu, w, p) -> float: """Tweedie deviance (covers Poisson p=1 and Gamma p=2 in the limit).""" y = np.maximum(y, 0) eps = 1e-12 if abs(p - 1.0) < 1e-9: # Poisson term = np.where(y > 0, y * np.log((y + eps) / mu), 0.0) - (y - mu) return float(2 * np.sum(w * term)) if abs(p - 2.0) < 1e-9: # Gamma term = -np.log((y + eps) / mu) + (y - mu) / mu return float(2 * np.sum(w * term)) # general Tweedie, 1 < p < 2 a = np.where(y > 0, y ** (2 - p) / ((1 - p) * (2 - p)), 0.0) b = y * mu ** (1 - p) / (1 - p) c = mu ** (2 - p) / (2 - p) return float(2 * np.sum(w * (a - b + c)))
[docs] def predict( self, data: pd.DataFrame, exposure: str | None = None, offset: str | None = None, ) -> np.ndarray: """Predicted mean for new rows. Categorical levels unseen in fitting fall back to the base level (relativity 1.0). ``exposure`` multiplies the mean; ``offset`` is a column already on the log scale. """ if self.coefficients_ is None: raise RuntimeError("model is not fit") info = self._design_info_ beta = self.coefficients_ eta = np.full(len(data), float(beta.iloc[0]), dtype=float) for var in info["predictors"]: vals = data[var] for name, coef in beta.items(): if name.startswith(f"{var}::"): lvl = name.split("::", 1)[1] eta += float(coef) * (vals == lvl).to_numpy(dtype=float) for var in info["continuous"]: eta += float(beta[var]) * data[var].to_numpy(dtype=float) if offset is not None: eta += data[offset].to_numpy(dtype=float) eta = np.clip(eta, -30, 30) mu = np.exp(eta) if exposure is not None: mu *= data[exposure].to_numpy(dtype=float) return mu
[docs] def summary(self) -> pd.DataFrame: """Coefficient table: estimate, quasi-likelihood SE, z, relativity. Standard errors use the Pearson-estimated dispersion (quasi-likelihood / quasi-Poisson style), which is the robust default for pricing data where overdispersion is the norm. """ if self.coefficients_ is None: raise RuntimeError("model is not fit") out = pd.DataFrame({"coef": self.coefficients_}) if self.se_ is not None: out["se"] = self.se_ with np.errstate(divide="ignore", invalid="ignore"): out["z"] = out["coef"] / out["se"] out["relativity"] = np.exp(out["coef"]) return out