"""Trend and projection primitives."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import numpy as np
import pandas as pd
from actuarialpy.columns import as_list, validate_columns
from actuarialpy.metrics import safe_divide
[docs]
def period_change(current: Any, prior: Any) -> Any:
"""Calculate period-over-period change: current / prior - 1."""
return safe_divide(current, prior) - 1
[docs]
def annualized_trend(current: Any, prior: Any, months_between: float) -> Any:
"""Annualize change between two values separated by a number of months."""
if months_between <= 0:
raise ValueError("months_between must be positive")
return safe_divide(current, prior) ** (12 / months_between) - 1
[docs]
def trend_factor(annual_trend: Any, months: float) -> Any:
"""Convert an annual trend rate into a trend factor over a number of months."""
return (1 + annual_trend) ** (months / 12)
[docs]
def project_forward(value: Any, annual_trend: Any, months: float) -> Any:
"""Project a value forward using an annual trend rate."""
return value * trend_factor(annual_trend, months)
[docs]
def midpoint_trend_factor(base_midpoint, projection_midpoint, annual_trend: Any) -> Any:
"""Trend factor between base and projection midpoints."""
base = pd.to_datetime(base_midpoint)
projection = pd.to_datetime(projection_midpoint)
months = (projection.year - base.year) * 12 + (projection.month - base.month)
return trend_factor(annual_trend, months)
def _date_range_mask(df: pd.DataFrame, date_col: str, start, end) -> pd.Series:
dates = pd.to_datetime(df[date_col])
start_date = pd.to_datetime(start)
end_date = pd.to_datetime(end)
if end_date < start_date:
raise ValueError("range end must be greater than or equal to range start")
return (dates >= start_date) & (dates <= end_date)
def _comparison_masks(
df: pd.DataFrame,
*,
period_col: str | None = None,
prior_period=None,
current_period=None,
date_col: str | None = None,
prior_start=None,
prior_end=None,
current_start=None,
current_end=None,
prior_filter=None,
current_filter=None,
) -> tuple[pd.Series, pd.Series, str]:
period_args_supplied = period_col is not None or prior_period is not None or current_period is not None
date_args_supplied = (
date_col is not None
or prior_start is not None
or prior_end is not None
or current_start is not None
or current_end is not None
)
filter_args_supplied = prior_filter is not None or current_filter is not None
modes = sum([period_args_supplied, date_args_supplied, filter_args_supplied])
if modes != 1:
raise ValueError(
"Use exactly one comparison mode: period_col/prior_period/current_period, "
"date_col with prior/current ranges, or prior_filter/current_filter."
)
if period_args_supplied:
if period_col is None or prior_period is None or current_period is None:
raise ValueError("period_col, prior_period, and current_period must all be supplied together.")
return df[period_col] == prior_period, df[period_col] == current_period, "period"
if date_args_supplied:
if None in (date_col, prior_start, prior_end, current_start, current_end):
raise ValueError(
"date_col, prior_start, prior_end, current_start, and current_end must all be supplied together."
)
assert date_col is not None # narrowed by the guard above
return (
_date_range_mask(df, date_col, prior_start, prior_end),
_date_range_mask(df, date_col, current_start, current_end),
"date",
)
if prior_filter is None or current_filter is None:
raise ValueError("prior_filter and current_filter must be supplied together.")
return prior_filter, current_filter, "filter"
[docs]
def trend_summary(
df: pd.DataFrame,
*,
period_col: str | None = None,
prior_period=None,
current_period=None,
date_col: str | None = None,
prior_start=None,
prior_end=None,
current_start=None,
current_end=None,
groupby=None,
amount_col: str,
exposure_col: str | None = None,
prior_filter=None,
current_filter=None,
prior_label: str = "prior",
current_label: str = "current",
) -> pd.DataFrame:
"""Summarize current vs prior trend by optional grouping.
Supported comparison modes:
- ``period_col='year', prior_period=2025, current_period=2026``
- ``date_col='incurred_date'`` with prior/current start and end dates
- explicit boolean ``prior_filter`` and ``current_filter`` masks
"""
groups = as_list(groupby)
required = groups + [amount_col] + ([exposure_col] if exposure_col else [])
if period_col is not None:
required.append(period_col)
if date_col is not None:
required.append(date_col)
validate_columns(df, required)
prior_filter, current_filter, mode = _comparison_masks(
df,
period_col=period_col,
prior_period=prior_period,
current_period=current_period,
date_col=date_col,
prior_start=prior_start,
prior_end=prior_end,
current_start=current_start,
current_end=current_end,
prior_filter=prior_filter,
current_filter=current_filter,
)
def summarize(mask, label):
# Aggregate only grouping, amount, and exposure columns. The comparison
# column (for example, ``year``) is used only to select records and must
# not leak into the final output as a summed numeric column such as
# ``year_x`` / ``year_y``.
summary_cols = groups + [amount_col] + ([exposure_col] if exposure_col else [])
temp = df.loc[mask, summary_cols].copy()
if groups:
out = temp.groupby(groups, dropna=False, as_index=False).sum(numeric_only=True)
else:
out = pd.DataFrame({amount_col: [temp[amount_col].sum()]})
if exposure_col:
out[exposure_col] = temp[exposure_col].sum()
out = out.rename(columns={amount_col: f"{label}_{amount_col}"})
if exposure_col:
out = out.rename(columns={exposure_col: f"{label}_{exposure_col}"})
out[f"{label}_{amount_col}_per_{exposure_col}"] = safe_divide(
out[f"{label}_{amount_col}"], out[f"{label}_{exposure_col}"]
)
return out
prior = summarize(prior_filter, prior_label)
current = summarize(current_filter, current_label)
out = prior.merge(current, on=groups, how="outer") if groups else pd.concat([prior, current], axis=1)
prior_metric = f"{prior_label}_{amount_col}_per_{exposure_col}" if exposure_col else f"{prior_label}_{amount_col}"
current_metric = f"{current_label}_{amount_col}_per_{exposure_col}" if exposure_col else f"{current_label}_{amount_col}"
out["trend"] = period_change(out[current_metric], out[prior_metric])
if mode == "period":
out.insert(len(groups), "prior_period", prior_period)
out.insert(len(groups) + 1, "current_period", current_period)
elif mode == "date":
out.insert(len(groups), "prior_start", pd.to_datetime(prior_start))
out.insert(len(groups) + 1, "prior_end", pd.to_datetime(prior_end))
out.insert(len(groups) + 2, "current_start", pd.to_datetime(current_start))
out.insert(len(groups) + 3, "current_end", pd.to_datetime(current_end))
return out
def _inverse_normal_cdf(p: float) -> float:
"""Standard-normal quantile via Acklam's rational approximation (no SciPy)."""
a = [-3.969683028665376e+01, 2.209460984245205e+02, -2.759285104469687e+02,
1.383577518672690e+02, -3.066479806614716e+01, 2.506628277459239e+00]
b = [-5.447609879822406e+01, 1.615858368580409e+02, -1.556989798598866e+02,
6.680131188771972e+01, -1.328068155288572e+01]
c = [-7.784894002430293e-03, -3.223964580411365e-01, -2.400758277161838e+00,
-2.549732539343734e+00, 4.374664141464968e+00, 2.938163982698783e+00]
d = [7.784695709041462e-03, 3.224671290700398e-01, 2.445134137142996e+00, 3.754408661907416e+00]
p_low = 0.02425
if p < p_low:
q = np.sqrt(-2 * np.log(p))
return (((((c[0] * q + c[1]) * q + c[2]) * q + c[3]) * q + c[4]) * q + c[5]) / \
((((d[0] * q + d[1]) * q + d[2]) * q + d[3]) * q + 1)
if p > 1 - p_low:
q = np.sqrt(-2 * np.log(1 - p))
return -(((((c[0] * q + c[1]) * q + c[2]) * q + c[3]) * q + c[4]) * q + c[5]) / \
((((d[0] * q + d[1]) * q + d[2]) * q + d[3]) * q + 1)
q = p - 0.5
r = q * q
return (((((a[0] * r + a[1]) * r + a[2]) * r + a[3]) * r + a[4]) * r + a[5]) * q / \
(((((b[0] * r + b[1]) * r + b[2]) * r + b[3]) * r + b[4]) * r + 1)
def _student_t_ppf(p: float, df: float) -> float:
"""Student-t quantile via the Cornish-Fisher / A&S 26.7.5 expansion (no SciPy).
Accurate to within ~0.001 of tabulated values for df >= 5; widens (conservatively)
for very small df. Adequate for trend confidence intervals.
"""
x = _inverse_normal_cdf(p)
g1 = (x**3 + x) / 4
g2 = (5 * x**5 + 16 * x**3 + 3 * x) / 96
g3 = (3 * x**7 + 19 * x**5 + 17 * x**3 - 15 * x) / 384
g4 = (79 * x**9 + 776 * x**7 + 1482 * x**5 - 1920 * x**3 - 945 * x) / 92160
return x + g1 / df + g2 / df**2 + g3 / df**3 + g4 / df**4
[docs]
@dataclass(frozen=True)
class TrendFit:
"""Result of :func:`fit_trend`: an exponential trend fitted to a rate series.
``annual_trend`` is the fitted multiplicative annual trend (``exp(slope) - 1`` on the
log scale). ``r_squared`` is the goodness of fit, ``std_error`` the delta-method
standard error of ``annual_trend``, and ``(ci_low, ci_high)`` its confidence interval
(asymmetric -- the endpoints are transformed from the log-scale slope interval).
``slope`` and ``intercept`` describe the underlying ``log(value) = intercept + slope * t``
fit with ``t`` measured in years from the first period.
"""
annual_trend: float
r_squared: float
std_error: float
ci_low: float
ci_high: float
confidence: float
n_periods: int
slope: float
intercept: float
@property
def ci(self) -> tuple[float, float]:
"""The confidence interval as a ``(low, high)`` tuple."""
return (self.ci_low, self.ci_high)
[docs]
def factor(self, months: float) -> float:
"""Trend factor over ``months`` at the fitted rate: ``(1 + annual_trend) ** (months / 12)``."""
return (1.0 + self.annual_trend) ** (months / 12.0)
def __repr__(self) -> str:
return (
f"TrendFit(annual_trend={self.annual_trend:.2%}, R2={self.r_squared:.3f}, "
f"{self.confidence:.0%} CI [{self.ci_low:.2%}, {self.ci_high:.2%}], n={self.n_periods})"
)
[docs]
def fit_trend(
df: pd.DataFrame,
*,
value_col: str,
date_col: str,
exposure_col: str | None = None,
freq: str = "M",
min_periods: int = 3,
confidence: float = 0.95,
) -> TrendFit:
"""Fit an exponential trend to a rate series by log-linear regression.
Aggregates ``df`` to the ``freq`` grain (summing ``value_col`` and, if given,
``exposure_col``), forms the rate -- ``value / exposure`` (e.g. PMPM) when
``exposure_col`` is supplied, otherwise ``value`` itself -- and fits
``log(rate) = intercept + slope * t`` by ordinary least squares, with ``t`` in years
from the first period. The fitted annual trend is ``exp(slope) - 1``.
Unlike :func:`annualized_trend` (a two-point CAGR between a single current and prior
value), this uses every period, so one noisy month does not swing the estimate, and it
returns goodness of fit and a confidence interval -- what a *developed* (rather than
received) trend is judged on. It does not select the trend: the window, the rate basis
(allowed vs paid), any benefit leveraging, and the blend with external trends remain
judgment. Run it on completed, deseasonalized history (``complete -> deseasonalize ->
fit_trend``) so runout and seasonality do not contaminate the slope; apply the result
with :func:`trend_factor`/:meth:`TrendFit.factor` or :func:`adjust`.
Time is measured from actual period dates, so an occasional missing period is handled
correctly. Requires at least ``min_periods`` distinct periods with strictly positive
rates (non-positive values, which cannot be logged, raise). Returns a :class:`TrendFit`.
"""
if not 0.0 < confidence < 1.0:
raise ValueError("confidence must be between 0 and 1.")
cols = [value_col, date_col] + ([exposure_col] if exposure_col else [])
validate_columns(df, cols)
period = pd.PeriodIndex(pd.to_datetime(df[date_col]), freq=freq)
work = pd.DataFrame({"_value": pd.to_numeric(df[value_col]).to_numpy()}, index=period)
if exposure_col:
work["_exposure"] = pd.to_numeric(df[exposure_col]).to_numpy()
grouped = work.groupby(level=0).sum().sort_index()
rate = grouped["_value"] / grouped["_exposure"] if exposure_col else grouped["_value"]
rate = rate.to_numpy(dtype="float64")
if len(rate) < max(min_periods, 3):
raise ValueError(f"fit_trend needs at least {max(min_periods, 3)} periods; got {len(rate)}.")
if np.any(rate <= 0):
raise ValueError("fit_trend requires strictly positive rates (cannot take the log of <= 0).")
timestamps = grouped.index.to_timestamp()
t = (timestamps - timestamps[0]).days.to_numpy(dtype="float64") / 365.25
if np.ptp(t) == 0:
raise ValueError("fit_trend needs at least two distinct periods.")
y = np.log(rate)
n = len(y)
t_mean, y_mean = t.mean(), y.mean()
sxx = float(np.sum((t - t_mean) ** 2))
sxy = float(np.sum((t - t_mean) * (y - y_mean)))
slope = sxy / sxx
intercept = y_mean - slope * t_mean
residuals = y - (intercept + slope * t)
sse = float(np.sum(residuals**2))
sst = float(np.sum((y - y_mean) ** 2))
# a flat series has no variance to explain (sst ~ 0 up to rounding); a constant fits it
# perfectly, so R^2 is 1.0 there rather than the unstable 0/0 of 1 - sse/sst.
r_squared = 1.0 if sst <= 1e-12 * max(1.0, abs(y_mean)) else 1.0 - sse / sst
resid_var = sse / (n - 2)
slope_se = float(np.sqrt(resid_var / sxx))
annual_trend = float(np.exp(slope) - 1.0)
std_error = float(np.exp(slope) * slope_se) # delta method
t_crit = _student_t_ppf((1.0 + confidence) / 2.0, n - 2)
ci_low = float(np.exp(slope - t_crit * slope_se) - 1.0)
ci_high = float(np.exp(slope + t_crit * slope_se) - 1.0)
return TrendFit(
annual_trend=annual_trend, r_squared=r_squared, std_error=std_error,
ci_low=ci_low, ci_high=ci_high, confidence=confidence, n_periods=n,
slope=float(slope), intercept=float(intercept),
)