"""Credibility models and primitives.
The credibility tools a pricing or experience-rating actuary reaches for: the
credibility-weighting primitive plus greatest-accuracy (Bühlmann and
Bühlmann-Straub) credibility models.
The ``Buhlmann`` and ``BuhlmannStraub`` models were previously part of the
``lossmodels`` package and were moved here, where credibility sits next to the
experience and ratemaking workflows that consume it.
"""
from __future__ import annotations
from statistics import NormalDist
from typing import Any
import numpy as np
import pandas as pd
_SCALAR_TYPES = (int, float, np.number)
[docs]
def credibility_weighted_estimate(observed: Any, complement: Any, z: Any) -> Any:
"""Blend an observed estimate with its complement at credibility ``z``.
Returns ``z * observed + (1 - z) * complement``. Scalar inputs return a
native ``float``; ``pandas.Series`` inputs return a ``Series`` with the index
preserved; other array-like inputs return a ``numpy.ndarray``. This is the
atomic credibility operation; the ``z`` may come from a model below, a filed
credibility formula, or any other source.
"""
if isinstance(observed, pd.Series) or isinstance(complement, pd.Series) or isinstance(z, pd.Series):
return z * observed + (1 - z) * complement
if isinstance(observed, _SCALAR_TYPES) and isinstance(complement, _SCALAR_TYPES) and isinstance(z, _SCALAR_TYPES):
return float(z * observed + (1 - z) * complement)
observed_arr = np.asarray(observed, dtype=float)
complement_arr = np.asarray(complement, dtype=float)
z_arr = np.asarray(z, dtype=float)
return z_arr * observed_arr + (1 - z_arr) * complement_arr
[docs]
class Buhlmann:
"""Bühlmann credibility model.
This implementation assumes each risk has the same number of observations.
Parameters
----------
overall_mean : float
Estimated collective mean.
epv : float
Estimated expected process variance (EPV).
vhm : float
Estimated variance of hypothetical means (VHM).
n_obs : int
Number of observations per risk.
"""
def __init__(self, overall_mean: float, epv: float, vhm: float, n_obs: int):
if n_obs <= 0:
raise ValueError("n_obs must be positive.")
if epv < 0:
raise ValueError("epv must be nonnegative.")
if vhm < 0:
raise ValueError("vhm must be nonnegative.")
self.overall_mean = float(overall_mean)
self.epv = float(epv)
self.vhm = float(vhm)
self.n_obs = int(n_obs)
@property
def k(self) -> float:
"""K = EPV / VHM. Returns infinity when VHM = 0."""
if self.vhm == 0:
return float("inf")
return self.epv / self.vhm
@property
def z(self) -> float:
"""Credibility factor ``Z = n / (n + K)``. Returns 0 when K is infinite."""
k = self.k
if not np.isfinite(k):
return 0.0
return self.n_obs / (self.n_obs + k)
[docs]
def premium(self, risk_mean: Any) -> Any:
"""Compute the Bühlmann credibility premium ``Z * risk_mean + (1 - Z) * overall_mean``.
Parameters
----------
risk_mean : float or array-like
Risk-specific sample mean(s).
Returns
-------
float or numpy.ndarray
Credibility-weighted premium(s).
"""
risk_mean = np.asarray(risk_mean, dtype=float)
premium = self.z * risk_mean + (1.0 - self.z) * self.overall_mean
return float(premium) if premium.ndim == 0 else premium
[docs]
@classmethod
def fit(cls, data: Any) -> Buhlmann:
"""Fit a Bühlmann credibility model from data.
Parameters
----------
data : array-like, shape (m, n)
Observations for m risks, each with n observations.
Returns
-------
Buhlmann
Fitted Bühlmann model.
Notes
-----
Estimators used:
- overall_mean = mean of all observations
- EPV = average of within-risk sample variances
- VHM = sample variance of risk means minus EPV / n, floored at 0
"""
data = np.asarray(data, dtype=float)
if data.ndim != 2:
raise ValueError("data must be a 2D array with shape (n_risks, n_obs).")
n_risks, n_obs = data.shape
if n_risks < 2:
raise ValueError("data must contain at least two risks.")
if n_obs < 2:
raise ValueError("each risk must have at least two observations.")
risk_means = np.mean(data, axis=1)
overall_mean = float(np.mean(data))
within_vars = np.var(data, axis=1, ddof=1)
epv = float(np.mean(within_vars))
between_var = float(np.var(risk_means, ddof=1))
vhm = max(between_var - epv / n_obs, 0.0)
return cls(overall_mean=overall_mean, epv=epv, vhm=vhm, n_obs=n_obs)
def __repr__(self) -> str:
return (
f"Buhlmann(overall_mean={self.overall_mean}, "
f"epv={self.epv}, vhm={self.vhm}, n_obs={self.n_obs})"
)
[docs]
class BuhlmannStraub:
"""Bühlmann-Straub credibility model.
This implementation allows different exposure weights by risk and period.
Parameters
----------
overall_mean : float
Estimated collective mean.
epv : float
Estimated expected process variance (EPV).
vhm : float
Estimated variance of hypothetical means (VHM).
weights : array-like
Total weight (exposure) for each risk.
"""
def __init__(self, overall_mean: float, epv: float, vhm: float, weights: Any):
weights = np.asarray(weights, dtype=float)
if weights.ndim != 1:
raise ValueError("weights must be a 1D array.")
if weights.size == 0:
raise ValueError("weights must not be empty.")
if np.any(weights <= 0):
raise ValueError("weights must be positive.")
if epv < 0:
raise ValueError("epv must be nonnegative.")
if vhm < 0:
raise ValueError("vhm must be nonnegative.")
self.overall_mean = float(overall_mean)
self.epv = float(epv)
self.vhm = float(vhm)
self.weights = weights
@property
def k(self) -> float:
"""K = EPV / VHM. Returns infinity when VHM = 0."""
if self.vhm == 0:
return float("inf")
return self.epv / self.vhm
[docs]
def z(self, weight: Any) -> Any:
"""Credibility factor for a given total risk weight: ``Z_i = w_i / (w_i + K)``.
Parameters
----------
weight : float or array-like
Total exposure weight(s).
Returns
-------
float or numpy.ndarray
Credibility factor(s).
"""
weight = np.asarray(weight, dtype=float)
if np.any(weight <= 0):
raise ValueError("weight must be positive.")
k = self.k
if not np.isfinite(k):
out = np.zeros_like(weight, dtype=float)
else:
out = weight / (weight + k)
return float(out) if out.ndim == 0 else out
[docs]
def premium(self, risk_mean: Any, weight: Any) -> Any:
"""Compute the Bühlmann-Straub premium ``Z_i * risk_mean_i + (1 - Z_i) * overall_mean``.
Parameters
----------
risk_mean : float or array-like
Risk-specific weighted mean(s).
weight : float or array-like
Total exposure weight(s).
Returns
-------
float or numpy.ndarray
Credibility-weighted premium(s).
"""
risk_mean = np.asarray(risk_mean, dtype=float)
z = self.z(weight)
premium = z * risk_mean + (1.0 - z) * self.overall_mean
return float(premium) if np.ndim(premium) == 0 else premium
@staticmethod
def _to_risk_lists(data: Any, weights: Any):
"""Normalize ``(data, weights)`` into lists of 1D arrays, one per risk.
Accepts a 2D array (equal period counts) or a sequence of 1D arrays per
risk (unequal period counts), validating shapes and positive weights.
"""
arr: Any
try:
arr = np.asarray(data, dtype=float)
except (ValueError, TypeError):
arr = None
if isinstance(arr, np.ndarray) and arr.ndim == 2:
warr = np.asarray(weights, dtype=float)
if warr.ndim != 2:
raise ValueError("weights must be a 2D array when data is 2D.")
if arr.shape != warr.shape:
raise ValueError("data and weights must have the same shape.")
if arr.shape[0] < 2:
raise ValueError("data must contain at least two risks.")
if arr.shape[1] < 2:
raise ValueError("each risk must have at least two periods.")
obs = [arr[i] for i in range(arr.shape[0])]
wts = [warr[i] for i in range(warr.shape[0])]
else:
obs = [np.asarray(x, dtype=float) for x in data]
wts = [np.asarray(w, dtype=float) for w in weights]
if len(obs) != len(wts):
raise ValueError("data and weights must have the same number of risks.")
if len(obs) < 2:
raise ValueError("data must contain at least two risks.")
for x, w in zip(obs, wts, strict=True):
if x.ndim != 1 or w.ndim != 1:
raise ValueError("each risk's data and weights must be 1D.")
if x.shape != w.shape:
raise ValueError("each risk's data and weights must have the same length.")
if x.size < 2:
raise ValueError("each risk must have at least two periods.")
allw = np.concatenate([np.ravel(w) for w in wts])
if np.any(allw <= 0):
raise ValueError("weights must be positive.")
return obs, wts
@staticmethod
def _estimate(risk_obs, risk_wts):
r"""General Bühlmann-Straub estimators on per-risk observation/weight lists.
For risks :math:`i` with observations :math:`X_{ij}` and weights
:math:`w_{ij}`, with :math:`m_i = \sum_j w_{ij}`,
:math:`\bar X_i = \sum_j w_{ij} X_{ij} / m_i`, :math:`m = \sum_i m_i`,
and :math:`\bar X = \sum_i m_i \bar X_i / m`:
.. math::
\hat s^2 = \frac{\sum_i \sum_j w_{ij}(X_{ij}-\bar X_i)^2}{\sum_i (n_i-1)},
\qquad
\hat a = \frac{\sum_i m_i(\bar X_i-\bar X)^2 - (r-1)\hat s^2}
{m - \sum_i m_i^2 / m}.
Handles unequal :math:`n_i`; :math:`\hat a` is floored at 0 (no credibility).
Returns ``(overall_mean, epv, vhm, m_i, xbar_i)``.
"""
r = len(risk_obs)
if r < 2:
raise ValueError("need at least two risks.")
m_i = np.array([float(w.sum()) for w in risk_wts])
xbar_i = np.array(
[float(np.sum(w * x) / w.sum()) for x, w in zip(risk_obs, risk_wts, strict=True)]
)
m = float(m_i.sum())
overall_mean = float(np.sum(m_i * xbar_i) / m)
ss_within = float(
sum(np.sum(w * (x - xb) ** 2) for x, w, xb in zip(risk_obs, risk_wts, xbar_i, strict=True))
)
dof = int(sum(len(x) - 1 for x in risk_obs))
if dof <= 0:
raise ValueError("need at least one risk with more than one observation.")
epv = ss_within / dof
between = float(np.sum(m_i * (xbar_i - overall_mean) ** 2))
denom = m - float(np.sum(m_i**2)) / m
vhm = max((between - (r - 1) * epv) / denom, 0.0) if denom > 0 else 0.0
return overall_mean, epv, vhm, m_i, xbar_i
[docs]
@classmethod
def fit(cls, data: Any, weights: Any) -> BuhlmannStraub:
r"""Fit a Bühlmann-Straub model from observations and weights.
Accepts either a 2D array (equal period counts) or a sequence of 1D
arrays per risk (unequal period counts). The estimators are the general
unbiased forms
.. math::
\hat s^2 = \frac{\sum_i\sum_j w_{ij}(X_{ij}-\bar X_i)^2}{\sum_i(n_i-1)},
\quad
\hat a = \frac{\sum_i m_i(\bar X_i-\bar X)^2 - (r-1)\hat s^2}
{m - \sum_i m_i^2/m},
with :math:`k=\hat s^2/\hat a` and :math:`Z_i=m_i/(m_i+k)`; a negative
:math:`\hat a` is floored at 0. For equal period counts this reduces to
the usual estimator; unlike a divide-by-mean-weight approximation it stays
unbiased when risks have different period counts or exposures.
Parameters
----------
data : array-like
Either shape ``(r, n)`` (equal periods) or a sequence of ``r`` 1D
arrays ``X_i`` whose lengths may differ.
weights : array-like
Exposure weights matching the shape/structure of ``data``.
"""
risk_obs, risk_wts = cls._to_risk_lists(data, weights)
overall_mean, epv, vhm, m_i, xbar_i = cls._estimate(risk_obs, risk_wts)
model = cls(overall_mean=overall_mean, epv=epv, vhm=vhm, weights=m_i)
model.groups_ = None
model.risk_means_ = xbar_i
return model
[docs]
@classmethod
def from_frame(
cls,
df,
*,
group: str,
value: str,
weight: str,
period: str | None = None,
) -> BuhlmannStraub:
"""Fit from long-format data: one row per (risk, period).
Parameters
----------
df : pandas.DataFrame
Long-format observations.
group, value, weight : str
Column names for the risk identifier, the per-unit observation (e.g.
loss per member-month), and the exposure weight.
period : str, optional
Period column; used only to order observations within a risk. The
number of observations per risk may differ.
Returns
-------
BuhlmannStraub
Fitted model with ``groups_`` (risk labels), ``risk_means_``, and
``weights`` (per-risk total exposure), all aligned to ``groups_``.
"""
required = [group, value, weight] + ([period] if period else [])
missing = [c for c in required if c not in df.columns]
if missing:
raise KeyError(f"columns not found in df: {missing}")
work = df[required].copy()
work[value] = work[value].astype(float)
work[weight] = work[weight].astype(float)
if np.any(work[weight].to_numpy() <= 0):
raise ValueError("weights must be positive.")
risk_obs, risk_wts, labels = [], [], []
for label, g in work.groupby(group, sort=True):
if period is not None:
g = g.sort_values(period)
risk_obs.append(g[value].to_numpy(dtype=float))
risk_wts.append(g[weight].to_numpy(dtype=float))
labels.append(label)
overall_mean, epv, vhm, m_i, xbar_i = cls._estimate(risk_obs, risk_wts)
model = cls(overall_mean=overall_mean, epv=epv, vhm=vhm, weights=m_i)
model.groups_ = pd.Index(labels, name=group)
model.risk_means_ = xbar_i
return model
def __repr__(self) -> str:
return (
f"BuhlmannStraub(overall_mean={self.overall_mean}, "
f"epv={self.epv}, vhm={self.vhm}, weights={self.weights})"
)
[docs]
def limited_fluctuation_z(exposure: Any, full_credibility_standard: float) -> Any:
"""Limited-fluctuation (classical) credibility factor -- the square-root rule.
Returns ``Z = min(1, sqrt(exposure / full_credibility_standard))``. ``exposure``
is the volume credibility is based on (claim counts, member months, life-years,
...) and ``full_credibility_standard`` is the amount of that volume required for
full (``Z = 1``) credibility -- often a filed value. Scalars return a native
``float``; ``pandas.Series`` inputs return a ``Series`` (index preserved); other
array-likes return a ``numpy.ndarray``, so credibility can be computed per group.
Feed the result to :func:`credibility_weighted_estimate` to blend experience with
its complement.
"""
if full_credibility_standard <= 0:
raise ValueError("full_credibility_standard must be positive.")
if isinstance(exposure, _SCALAR_TYPES):
return float(min(1.0, np.sqrt(max(float(exposure), 0.0) / full_credibility_standard)))
ratio_arr = np.asarray(exposure, dtype=float) / full_credibility_standard
z_arr = np.minimum(np.sqrt(np.clip(ratio_arr, 0.0, None)), 1.0)
if isinstance(exposure, pd.Series):
return pd.Series(z_arr, index=exposure.index, name=exposure.name)
return z_arr
[docs]
def full_credibility_claims(
*, confidence: float = 0.90, tolerance: float = 0.05, severity_cv: float | None = None
) -> float:
"""Classical full-credibility standard, in expected number of claims.
Returns the expected claim count for full credibility under the
limited-fluctuation model: ``(z / k) ** 2`` for claim frequency, where ``z`` is
the standard-normal quantile for two-sided ``confidence`` and ``k`` is the
``tolerance``. The classic 90% / 5% choice gives about 1082 claims. Supplying
``severity_cv`` (the coefficient of variation of individual claim severity)
inflates it to ``(z / k) ** 2 * (1 + severity_cv ** 2)`` for aggregate losses
rather than pure frequency.
Many shops use a filed standard instead; pass that straight to
:func:`limited_fluctuation_z`.
"""
if not 0.0 < confidence < 1.0:
raise ValueError("confidence must be between 0 and 1.")
if tolerance <= 0.0:
raise ValueError("tolerance must be positive.")
z = NormalDist().inv_cdf((1.0 + confidence) / 2.0)
standard = (z / tolerance) ** 2
if severity_cv is not None:
if severity_cv < 0.0:
raise ValueError("severity_cv must be non-negative.")
standard *= 1.0 + severity_cv**2
return standard