r"""Fitting under left truncation and right censoring (deductibles and limits).
Real claim data rarely arrives ground-up and complete. A deductible ``d``
left-truncates: losses below ``d`` never enter the data, and each observed
loss is a draw from ``X | X > d``. A policy limit right-censors: when the
payment hits its maximum we know only that the ground-up loss was *at least*
the censoring point.
For observation :math:`i` with ground-up value :math:`x_i`, truncation point
:math:`t_i` (0 when none) and censoring indicator :math:`\delta_i`
(:math:`\delta_i = 0` for a censored observation, where :math:`x_i` is the
censoring point), the individual-data log-likelihood (Loss Models, ch. 11) is
.. math::
\ell(\theta) \;=\; \sum_i \Big[\, \delta_i \log f(x_i)
+ (1 - \delta_i) \log S(x_i) - \log S(t_i) \,\Big],
where :math:`S = 1 - F` is the survival function. Fitting a complete-data
likelihood to truncated or censored values is biased -- often badly -- which
is exactly why this module exists.
:func:`payments_to_ground_up` converts the common per-payment layout
(payments net of a deductible, capped at a maximum payment) into the
``(values, truncation, censored)`` triple every fitter in
:mod:`lossmodels.estimation` accepts.
"""
import numpy as np
from scipy.optimize import minimize
_TINY = 1e-300
def _prepare(values, truncation=None, censored=None):
"""Validate and broadcast (values, truncation, censored).
Returns
-------
tuple of np.ndarray
``(values, truncation, censored)`` with ``values > 0`` ground-up
amounts, ``truncation >= 0`` per-observation left-truncation points
(0 = untruncated), and boolean ``censored`` flags (True = the value is
a right-censoring point, i.e. a lower bound on the loss).
"""
values = np.asarray(values, dtype=float)
if values.ndim != 1 or values.size == 0:
raise ValueError("values must be a nonempty 1D array.")
if np.any(~np.isfinite(values)) or np.any(values <= 0):
raise ValueError("values must be finite and strictly positive.")
if truncation is None:
truncation = np.zeros_like(values)
else:
truncation = np.broadcast_to(
np.asarray(truncation, dtype=float), values.shape
).copy()
if np.any(~np.isfinite(truncation)) or np.any(truncation < 0):
raise ValueError("truncation must be finite and nonnegative.")
if censored is None:
censored = np.zeros(values.shape, dtype=bool)
else:
censored = np.broadcast_to(np.asarray(censored, dtype=bool), values.shape).copy()
if np.any(values < truncation):
raise ValueError(
"each value must be at least its truncation point (values are "
"ground-up amounts observed only when the loss exceeds the "
"deductible)."
)
if np.any(~censored & (values <= truncation)):
raise ValueError(
"uncensored values must strictly exceed their truncation point."
)
return values, truncation, censored
[docs]
def payments_to_ground_up(payments, deductible=0.0, max_payment=None, rtol=1e-9):
"""Convert per-payment data to ``(values, truncation, censored)``.
The per-payment convention: with deductible ``d`` and maximum payment
``m`` (the policy limit net of the deductible), the recorded payment is
``Y = min(X - d, m)`` for a ground-up loss ``X > d``.
Parameters
----------
payments : array-like
Positive payment amounts.
deductible : float or array-like
Deductible per observation (scalar broadcasts). Default 0.
max_payment : float or array-like, optional
Maximum payment per observation. Payments within relative tolerance
``rtol`` of the maximum are flagged as right-censored.
rtol : float
Relative tolerance for detecting capped payments.
Returns
-------
tuple of np.ndarray
``(values, truncation, censored)`` on the ground-up scale, ready for
any fitter in :mod:`lossmodels.estimation`.
"""
payments = np.asarray(payments, dtype=float)
if payments.ndim != 1 or payments.size == 0:
raise ValueError("payments must be a nonempty 1D array.")
if np.any(~np.isfinite(payments)) or np.any(payments <= 0):
raise ValueError("payments must be finite and strictly positive.")
deductible = np.broadcast_to(np.asarray(deductible, dtype=float), payments.shape)
if np.any(deductible < 0):
raise ValueError("deductible must be nonnegative.")
values = payments + deductible
truncation = deductible.copy()
if max_payment is None:
censored = np.zeros(payments.shape, dtype=bool)
else:
max_payment = np.broadcast_to(
np.asarray(max_payment, dtype=float), payments.shape
)
if np.any(max_payment <= 0):
raise ValueError("max_payment must be positive.")
if np.any(payments > max_payment * (1.0 + rtol)):
raise ValueError("payments cannot exceed max_payment.")
censored = payments >= max_payment * (1.0 - rtol)
return values, truncation, censored
def _survival(model, x):
"""S(x) = 1 - F(x), vectorized with a per-element fallback."""
x = np.asarray(x, dtype=float)
try:
f = np.asarray(model.cdf(x), dtype=float)
if f.shape != x.shape:
raise ValueError("non-vectorized result")
except Exception:
f = np.array([model.cdf(float(v)) for v in x], dtype=float)
return 1.0 - f
def _density(model, x):
x = np.asarray(x, dtype=float)
try:
f = np.asarray(model.pdf(x), dtype=float)
if f.shape != x.shape:
raise ValueError("non-vectorized result")
except Exception:
f = np.array([model.pdf(float(v)) for v in x], dtype=float)
return f
[docs]
def censored_log_likelihood(model, values, truncation=None, censored=None) -> float:
r"""Log-likelihood under left truncation and right censoring.
Implements :math:`\sum_i [\delta_i \log f(x_i) + (1-\delta_i)\log S(x_i)
- \log S(t_i)]`. With no truncation and no censoring this reduces to the
ordinary log-likelihood. Requires ``model.pdf`` and ``model.cdf``.
Returns ``-inf`` when the model assigns nonpositive density or survival
where the data require it (e.g. a support violation).
"""
values, truncation, censored = _prepare(values, truncation, censored)
if not hasattr(model, "pdf") or not hasattr(model, "cdf"):
raise TypeError("model must implement pdf(x) and cdf(x).")
ll = 0.0
unc = ~censored
if np.any(unc):
f = _density(model, values[unc])
if np.any(~np.isfinite(f)) or np.any(f <= 0):
return float(-np.inf)
ll += float(np.sum(np.log(np.maximum(f, _TINY))))
if np.any(censored):
s = _survival(model, values[censored])
if np.any(~np.isfinite(s)) or np.any(s <= 0):
return float(-np.inf)
ll += float(np.sum(np.log(np.maximum(s, _TINY))))
positive_t = truncation > 0
if np.any(positive_t):
s_t = _survival(model, truncation[positive_t])
if np.any(~np.isfinite(s_t)) or np.any(s_t <= 0):
return float(-np.inf)
ll -= float(np.sum(np.log(np.maximum(s_t, _TINY))))
return float(ll)
[docs]
def fit_mle_censored(
model_class,
values,
initial_params,
bounds=None,
truncation=None,
censored=None,
):
"""Generic numerical MLE under left truncation and right censoring.
The censored-and-truncated analogue of
:func:`lossmodels.estimation.fit_mle`: ``model_class(*params)`` must
provide ``pdf`` and ``cdf``. Uses L-BFGS-B (with ``bounds``) refined by
Nelder-Mead if the gradient-based step fails to improve.
Returns
-------
object
Fitted ``model_class`` instance.
"""
values, truncation, censored = _prepare(values, truncation, censored)
initial_params = np.asarray(initial_params, dtype=float)
if initial_params.size == 0:
raise ValueError("initial_params must not be empty.")
def neg_ll(params):
try:
model = model_class(*params)
except Exception:
return 1e300
ll = censored_log_likelihood(model, values, truncation, censored)
return 1e300 if not np.isfinite(ll) else -ll
result = minimize(
neg_ll,
x0=initial_params,
bounds=bounds,
method="L-BFGS-B" if bounds is not None else "BFGS",
)
best_x, best_f = result.x, float(result.fun)
# polish: L-BFGS-B with numeric gradients can stall on flat likelihoods
refine = minimize(neg_ll, x0=best_x, method="Nelder-Mead",
options={"maxiter": 2000, "xatol": 1e-8, "fatol": 1e-10})
if np.isfinite(refine.fun) and refine.fun < best_f:
best_x, best_f = refine.x, float(refine.fun)
if not np.isfinite(best_f):
raise RuntimeError("censored MLE optimization failed to find a finite likelihood.")
return model_class(*best_x)
[docs]
def kaplan_meier(values, truncation=None, censored=None):
r"""Product-limit (Kaplan-Meier) survival estimate under left truncation
and right censoring.
With event (uncensored) times :math:`y_1 < y_2 < \dots`, :math:`d_j`
events at :math:`y_j`, and risk set
:math:`R_j = \#\{i : t_i < y_j \le x_i\}`,
.. math:: \hat S(y) = \prod_{y_j \le y} \Big(1 - \frac{d_j}{R_j}\Big).
Under left truncation the estimator is *conditional on survival to the
smallest truncation point* :math:`t_{\min}`: it estimates
:math:`S(y)/S(t_{\min})`. With a single common deductible this is exactly
the conditional severity the data can identify. Risk sets near
:math:`t_{\min}` can be small when truncation points are heterogeneous;
interpret the left end with care.
Returns
-------
tuple of np.ndarray
``(times, survival)``: distinct event times and the estimated
survival immediately after each.
"""
values, trunc, cens = _prepare(values, truncation, censored)
event_vals = values[~cens]
if event_vals.size == 0:
raise ValueError("at least one uncensored observation is required.")
times, d = np.unique(event_vals, return_counts=True)
sorted_vals = np.sort(values)
sorted_trunc = np.sort(trunc)
# R_j = #{t_i < y_j} - #{x_i < y_j}; censored ties at y_j stay in the risk set
entered = np.searchsorted(sorted_trunc, times, side="left")
exited = np.searchsorted(sorted_vals, times, side="left")
risk = entered - exited
if np.any(risk <= 0):
raise ValueError("empty risk set encountered; truncation pattern too sparse.")
surv = np.cumprod(1.0 - d / risk)
return times, surv