"""Large-loss pooling and the excess-over-threshold modeling hand-off.
Experience rating pools (caps) large losses so a single catastrophic claim does
not distort a group's experience. These helpers are the deterministic transform
that capping requires: ``pool_losses`` splits each loss into a pooled (retained)
portion and an excess portion at a pooling point, and ``excess_over_threshold``
emits the per-claim excess sample that feeds tail and aggregate modeling.
Terminology is line-agnostic: a "loss" is any claim amount; the "pooling point"
is the cap / retention. The ``excess_over_threshold`` output is the documented
hand-off to the modeling satellites (a GPD tail fit in ``extremeloss``, a
severity or aggregate model in ``lossmodels``, an aggregate simulation in
``risksim``). For descriptive large-claim flagging and concentration, see
``actuarialpy.claimants``.
"""
from __future__ import annotations
from collections.abc import Iterable
import numpy as np
import pandas as pd
from actuarialpy.columns import as_list, validate_columns
[docs]
def pool_losses(
df: pd.DataFrame,
loss_col: str,
pooling_point: float,
*,
pooled_col: str = "pooled_loss",
excess_col: str = "excess_loss",
copy: bool = True,
) -> pd.DataFrame:
"""Split each loss into a pooled (capped) portion and an excess portion.
``pooled = min(loss, pooling_point)`` is the retained amount used in the
group's experience; ``excess = max(loss - pooling_point, 0)`` is the portion
pooled across the block. Summing ``pooled_col`` by group gives capped
experience; summing ``excess_col`` gives the pooled excess. The input is
typically one row per claimant (e.g. the output of ``summarize_claimants``).
"""
validate_columns(df, [loss_col])
result = df.copy() if copy else df
result[pooled_col] = result[loss_col].clip(upper=pooling_point)
result[excess_col] = (result[loss_col] - pooling_point).clip(lower=0)
return result
[docs]
def excess_over_threshold(
df: pd.DataFrame,
loss_col: str,
threshold: float,
*,
keep_cols: str | Iterable[str] | None = None,
excess_col: str = "excess",
) -> pd.DataFrame:
"""Return losses strictly above ``threshold`` with their excess amount.
``excess = loss - threshold`` for rows where ``loss > threshold``. This is
the excess-over-threshold sample used to fit a tail (e.g. a generalized
Pareto distribution in ``extremeloss``) or a severity distribution in
``lossmodels``; the threshold is the EVT exceedance threshold / pooling
point. ``keep_cols`` carries identifier or covariate columns through.
"""
keep = as_list(keep_cols)
validate_columns(df, [loss_col] + keep)
above = df[df[loss_col] > threshold].copy()
result = above[keep + [loss_col]].copy()
result[excess_col] = above[loss_col] - threshold
return result.reset_index(drop=True)
def _retention_moments(outcomes):
"""Sort the outcome sample and precompute prefix sums for fast capped moments."""
x = np.sort(np.asarray(outcomes, dtype=float))
if x.size == 0:
raise ValueError("outcomes is empty")
csum = np.concatenate(([0.0], np.cumsum(x)))
csum2 = np.concatenate(([0.0], np.cumsum(x * x)))
return x, csum, csum2
def _retained_cv_at(sorted_x, csum, csum2, retention, n_units):
"""CV of the retained aggregate at one or more retention levels (vectorized)."""
n = sorted_x.size
u = np.asarray(retention, dtype=float)
k = np.searchsorted(sorted_x, u, side="right") # count of outcomes <= u
rem = n - k
mean = (csum[k] + rem * u) / n # E[min(X, u)]
e2 = (csum2[k] + rem * u * u) / n # E[min(X, u)^2]
var = np.maximum(e2 - mean * mean, 0.0)
with np.errstate(divide="ignore", invalid="ignore"):
cv = np.where(mean > 0, np.sqrt(var) / mean, np.nan)
return cv / np.sqrt(n_units)
[docs]
def retained_cv(outcomes, retention, *, n_units=1):
"""Coefficient of variation of the retained aggregate of ``n_units`` iid units.
Each unit's outcome is retained (capped) at ``retention`` -- ``min(outcome,
retention)`` -- and ``n_units`` such units are summed. For independent units
this CV is ``cv(min(X, retention)) / sqrt(n_units)``, where ``X`` is drawn from
the per-unit outcome sample ``outcomes`` (array-like). Capping discards
everything above ``retention``, so only the body of ``outcomes`` matters.
Parameters
----------
outcomes : array-like
Per-unit outcome sample (e.g. one value per member-year, claim, or risk).
retention : float or array-like
Cap applied to each unit. Scalar returns a float; an array returns the CV
at each retention.
n_units : int, default 1
Number of independent units in the aggregate.
Returns
-------
float or numpy.ndarray
Coefficient of variation of the retained aggregate.
"""
x, csum, csum2 = _retention_moments(outcomes)
cv = _retained_cv_at(x, csum, csum2, retention, n_units)
return float(cv) if np.ndim(retention) == 0 else cv
[docs]
def retention_for_target_cv(outcomes, n_units, target_cv, *, bounds=None, n_grid=256):
"""Retention at which the retained aggregate of ``n_units`` units hits a target CV.
Inverts :func:`retained_cv`. The single-unit retained CV increases with the
retention, so this solves ``retained_cv(outcomes, u, n_units=n_units) ==
target_cv`` for the retention ``u`` by interpolation over a grid spanning
``bounds`` (default ``min..max`` of ``outcomes``). Targets below or above the
achievable range clamp to the lower or upper bound. Holding ``target_cv`` fixed,
a larger ``n_units`` yields a higher retention (more independent units stabilize
the aggregate, so less needs to be capped) -- i.e. the basis for a size-graded
retention rule.
Parameters
----------
outcomes : array-like
Per-unit outcome sample.
n_units : int
Number of independent units in the aggregate.
target_cv : float
Desired coefficient of variation of the retained aggregate.
bounds : tuple(float, float), optional
``(lo, hi)`` retention search bounds. Defaults to the min and max of
``outcomes``.
n_grid : int, default 256
Number of grid points spanning ``bounds``.
Returns
-------
float
The retention level, clamped to ``bounds``.
"""
x, csum, csum2 = _retention_moments(outcomes)
lo = float(bounds[0]) if bounds is not None else float(x[0])
hi = float(bounds[1]) if bounds is not None else float(x[-1])
grid = np.linspace(lo, hi, int(n_grid))
cvx = np.maximum.accumulate(_retained_cv_at(x, csum, csum2, grid, n_units=1))
target_cvx = float(target_cv) * np.sqrt(n_units)
u = float(np.interp(target_cvx, cvx, grid))
return float(np.clip(u, lo, hi))