Source code for actuarialpy.decomposition

"""Frequency-severity and per-exposure trend decomposition.

Splits a per-exposure loss (pure premium) into its frequency and severity
drivers, and decomposes the change between two periods into a frequency effect and
a severity effect -- the standard "how much of the trend is frequency vs severity"
exhibit (a health shop's utilization vs unit cost). Decomposing requires a claim (or service) count alongside losses and
exposure.

Passing ``mix_by`` to :func:`decompose_per_exposure_trend` adds a third **mix** component
(frequency x severity x mix), separating the effect of the exposure composition
shifting across cells from genuine within-cell frequency and severity movement.
"""

from __future__ import annotations

from collections.abc import Iterable

import numpy as np
import pandas as pd

from actuarialpy.columns import as_list, validate_columns
from actuarialpy.metrics import frequency, per_exposure, safe_divide, severity


[docs] def frequency_severity_summary( df: pd.DataFrame, *, count_col: str, loss_col: str, exposure_col: str, groupby: str | Iterable[str] | None = None, ) -> pd.DataFrame: """Per-group claim frequency, severity, and per-exposure loss. Counts, losses, and exposure are aggregated first, then the rates are derived after aggregation (avoiding averaging row-level rates). The identity ``loss_per_exposure == frequency * severity`` holds for every row: ``frequency`` is claims per exposure unit, ``severity`` is loss per claim, and ``loss_per_exposure`` is loss per exposure unit (the pure premium). """ groups = as_list(groupby) validate_columns(df, groups + [count_col, loss_col, exposure_col]) amount_cols = [count_col, loss_col, exposure_col] if groups: summary = df[groups + amount_cols].groupby(groups, dropna=False, as_index=False).sum(numeric_only=True) else: summary = pd.DataFrame({col: [df[col].sum()] for col in amount_cols}) summary["frequency"] = frequency(summary[count_col], summary[exposure_col]) summary["severity"] = severity(summary[loss_col], summary[count_col]) summary["loss_per_exposure"] = per_exposure(summary[loss_col], summary[exposure_col]) ordered = groups + [exposure_col, count_col, loss_col, "frequency", "severity", "loss_per_exposure"] return summary[[col for col in ordered if col in summary.columns]]
def _logarithmic_mean(a: np.ndarray, b: np.ndarray) -> np.ndarray: """Elementwise logarithmic mean ``L(a, b) = (a - b) / (ln a - ln b)``, with ``L(a, a) = a``. Defined for strictly positive inputs. This is the weight kernel behind the LMDI (logarithmic mean Divisia index) decomposition, which reconciles exactly with no residual term. """ a = np.asarray(a, dtype=float) b = np.asarray(b, dtype=float) close = np.isclose(a, b) log_diff = np.where(close, 1.0, np.log(a) - np.log(b)) return np.where(close, a, (a - b) / log_diff) def _aggregate_cells(df: pd.DataFrame, keys: list[str], cols: list[str]) -> pd.DataFrame: """Sum ``cols`` over ``keys`` (or the whole frame when ``keys`` is empty).""" if keys: return df[keys + cols].groupby(keys, dropna=False, as_index=False).sum(numeric_only=True) return pd.DataFrame({col: [df[col].sum()] for col in cols}) def _lmdi_three_way( m0: np.ndarray, n0: np.ndarray, a0: np.ndarray, m1: np.ndarray, n1: np.ndarray, a1: np.ndarray, ) -> dict[str, float]: """LMDI frequency / severity / mix split for one reporting group. Each argument is an array over the mix cells: ``m`` exposure, ``n`` count, ``a`` dollars; suffix ``0`` prior and ``1`` current. Returns the multiplicative factors (``frequency_trend * severity_trend * mix_trend == loss_per_exposure_trend``) and the additive dollar effects (``frequency_effect + severity_effect + mix_effect == loss_per_exposure_change``); both exact. """ big_m0, big_m1 = m0.sum(), m1.sum() u0, c0, w0 = n0 / m0, a0 / n0, m0 / big_m0 u1, c1, w1 = n1 / m1, a1 / n1, m1 / big_m1 v0, v1 = a0 / big_m0, a1 / big_m1 # cell contribution to group per-exposure loss (== w*u*c) p0, p1 = float(v0.sum()), float(v1.sum()) # group per-exposure loss each period l_cell = _logarithmic_mean(v1, v0) l_tot = float(_logarithmic_mean(np.array([p1]), np.array([p0]))[0]) omega = l_cell / l_tot ln_u, ln_c, ln_w = np.log(u1 / u0), np.log(c1 / c0), np.log(w1 / w0) return { "loss_per_exposure_prior": p0, "loss_per_exposure_current": p1, "loss_per_exposure_trend": p1 / p0, "frequency_trend": float(np.exp(np.sum(omega * ln_u))), "severity_trend": float(np.exp(np.sum(omega * ln_c))), "mix_trend": float(np.exp(np.sum(omega * ln_w))), "loss_per_exposure_change": p1 - p0, "frequency_effect": float(np.sum(l_cell * ln_u)), "severity_effect": float(np.sum(l_cell * ln_c)), "mix_effect": float(np.sum(l_cell * ln_w)), } def _decompose_per_exposure_trend_mix( prior: pd.DataFrame, current: pd.DataFrame, *, count_col: str, loss_col: str, exposure_col: str, on: list[str], mix_by: str | Iterable[str], ) -> pd.DataFrame: """Three-way (frequency x severity x mix) per-exposure loss decomposition via LMDI.""" mix_keys = as_list(mix_by) overlap = [k for k in mix_keys if k in on] if overlap: raise ValueError( f"on and mix_by must be distinct dimensions; shared column(s): {overlap}. " "Mix is undefined when the mix dimension is also a reporting group." ) cell_keys = on + mix_keys cols = [count_col, loss_col, exposure_col] validate_columns(prior, cell_keys + cols) validate_columns(current, cell_keys + cols) p_cells = _aggregate_cells(prior, cell_keys, cols) c_cells = _aggregate_cells(current, cell_keys, cols) if cell_keys: merged = p_cells.merge(c_cells, on=cell_keys, how="outer", suffixes=("_prior", "_current")) else: merged = pd.concat( [p_cells.add_suffix("_prior").reset_index(drop=True), c_cells.add_suffix("_current").reset_index(drop=True)], axis=1, ) period_cols = [f"{col}_{per}" for per in ("prior", "current") for col in cols] invalid = merged[period_cols].isna().any(axis=1) | (merged[period_cols] <= 0).any(axis=1) if bool(invalid.any()): shown = merged.loc[invalid, cell_keys] if cell_keys else merged.loc[invalid, period_cols] raise ValueError( "decompose_per_exposure_trend(mix_by=...) requires every mix cell to have positive " f"{exposure_col!r}, {count_col!r}, and {loss_col!r} in BOTH periods; the " "within-cell frequency x severity x mix split is undefined otherwise. " "Combine sparse cells or filter cells that enter/exit between periods. " f"Offending cell(s):\n{shown.to_string(index=False)}" ) e0, n0, l0 = f"{exposure_col}_prior", f"{count_col}_prior", f"{loss_col}_prior" e1, n1, l1 = f"{exposure_col}_current", f"{count_col}_current", f"{loss_col}_current" def _group_record(sub: pd.DataFrame) -> dict[str, float]: return _lmdi_three_way( sub[e0].to_numpy(), sub[n0].to_numpy(), sub[l0].to_numpy(), sub[e1].to_numpy(), sub[n1].to_numpy(), sub[l1].to_numpy(), ) records: list[dict] = [] if on: for group_vals, sub in merged.groupby(on, dropna=False, sort=False): group_vals = group_vals if isinstance(group_vals, tuple) else (group_vals,) records.append({**dict(zip(on, group_vals, strict=True)), **_group_record(sub)}) else: records.append(_group_record(merged)) out = pd.DataFrame(records) ordered = on + [ "loss_per_exposure_prior", "loss_per_exposure_current", "loss_per_exposure_trend", "frequency_trend", "severity_trend", "mix_trend", "loss_per_exposure_change", "frequency_effect", "severity_effect", "mix_effect", ] return out[[col for col in ordered if col in out.columns]]
[docs] def decompose_per_exposure_trend( prior: pd.DataFrame, current: pd.DataFrame, *, count_col: str, loss_col: str, exposure_col: str, on: str | Iterable[str] | None = None, mix_by: str | Iterable[str] | None = None, ) -> pd.DataFrame: """Decompose the per-exposure loss change from ``prior`` to ``current``. With ``mix_by`` omitted this is the two-way split: both frames are summarized with :func:`frequency_severity_summary` (optionally by the ``on`` keys), aligned, and the change reported two exact ways: - **Multiplicative trend**: ``loss_per_exposure_trend == frequency_trend * severity_trend``, where ``frequency_trend`` and ``severity_trend`` are the period-over-period ratios of frequency and severity. - **Additive dollars**: ``loss_per_exposure_change == frequency_effect + severity_effect`` via a symmetric (midpoint) split, so the contributions sum exactly to the per-exposure change. Pass ``mix_by`` (a column or list of columns) to add a third **mix** component. The per-exposure loss is then decomposed into frequency, severity, and the effect of the exposure composition shifting across the ``mix_by`` cells. Frequency and severity are measured *within* each cell (free of composition), and mix captures the aggregate movement that comes purely from the cell weights changing -- the piece the two-way otherwise misattributes to frequency and severity. The split uses the LMDI (logarithmic mean Divisia index) convention, which is order-free and reconciles exactly: ``loss_per_exposure_trend == frequency_trend * severity_trend * mix_trend`` and ``loss_per_exposure_change == frequency_effect + severity_effect + mix_effect``. A list of columns in ``mix_by`` defines the cells as their cross -- one blended mix term, not a per-column attribution; to attribute mix to each dimension separately, run the decomposition once per dimension. ``on`` and ``mix_by`` are orthogonal: ``on`` groups the output rows, ``mix_by`` defines the mix cells within each group. Every cell must have positive count, loss, and exposure in both periods. """ keys = as_list(on) if mix_by is not None: return _decompose_per_exposure_trend_mix( prior, current, count_col=count_col, loss_col=loss_col, exposure_col=exposure_col, on=keys, mix_by=mix_by, ) p = frequency_severity_summary( prior, count_col=count_col, loss_col=loss_col, exposure_col=exposure_col, groupby=on, ) c = frequency_severity_summary( current, count_col=count_col, loss_col=loss_col, exposure_col=exposure_col, groupby=on, ) keep = ["frequency", "severity", "loss_per_exposure"] if keys: merged = p[keys + keep].merge(c[keys + keep], on=keys, how="outer", suffixes=("_prior", "_current")) else: merged = pd.concat( [p[keep].add_suffix("_prior").reset_index(drop=True), c[keep].add_suffix("_current").reset_index(drop=True)], axis=1, ) merged["frequency_trend"] = safe_divide(merged["frequency_current"], merged["frequency_prior"]) merged["severity_trend"] = safe_divide(merged["severity_current"], merged["severity_prior"]) merged["loss_per_exposure_trend"] = safe_divide(merged["loss_per_exposure_current"], merged["loss_per_exposure_prior"]) freq_mean = (merged["frequency_prior"] + merged["frequency_current"]) / 2 sev_mean = (merged["severity_prior"] + merged["severity_current"]) / 2 merged["loss_per_exposure_change"] = merged["loss_per_exposure_current"] - merged["loss_per_exposure_prior"] merged["frequency_effect"] = (merged["frequency_current"] - merged["frequency_prior"]) * sev_mean merged["severity_effect"] = (merged["severity_current"] - merged["severity_prior"]) * freq_mean ordered = keys + [ "loss_per_exposure_prior", "loss_per_exposure_current", "loss_per_exposure_trend", "frequency_trend", "severity_trend", "loss_per_exposure_change", "frequency_effect", "severity_effect", "frequency_prior", "frequency_current", "severity_prior", "severity_current", ] return merged[[col for col in ordered if col in merged.columns]]