"""Stateful facade for experience-analysis workflows."""
from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass, replace
from typing import Any, cast
import pandas as pd
from pandas.api.types import is_numeric_dtype
from actuarialpy.banding import summarize_by_band
from actuarialpy.claimants import claim_concentration, summarize_claimants, top_claimants
from actuarialpy.cohorts import cohort_summary, duration_summary
from actuarialpy.columns import as_list, sum_columns, validate_columns
from actuarialpy.components import component_driver_analysis, summarize_components
from actuarialpy.credibility import credibility_weighted_estimate
from actuarialpy.decomposition import decompose_per_exposure_trend, frequency_severity_summary
from actuarialpy.expected import summarize_actual_vs_expected
from actuarialpy.experience import status_summary, summarize_experience, summarize_views
from actuarialpy.lifecycle import derive_status
from actuarialpy.metrics import per_exposure, safe_divide
from actuarialpy.pooling import pool_losses
from actuarialpy.adjustments import adjust as _adjust
from actuarialpy.reserving import apply_completion as _apply_completion
from actuarialpy.rolling import rolling_summary
from actuarialpy.seasonality import deseasonalize as _deseasonalize
from actuarialpy.trend import TrendFit, _comparison_masks, fit_trend as _fit_trend, trend_summary
_ID_LIKE_EXPOSURE_NAMES = {"member_id", "subscriber_id", "group_id", "employee_id", "policy_id", "claim_id"}
def _validate_exposure_names(exposures: list[str]) -> None:
bad = [col for col in exposures if col.lower() in _ID_LIKE_EXPOSURE_NAMES or col.lower().endswith("_id")]
if bad:
raise ValueError(
"Exposure columns must be numeric exposure measures, not identifiers. "
f"Invalid exposure column(s): {bad}."
)
def _validate_numeric_columns(df: pd.DataFrame, cols: list[str], *, role: str) -> None:
bad = [col for col in cols if not is_numeric_dtype(df[col])]
if bad:
raise ValueError(f"{role} columns must be numeric. Non-numeric column(s): {bad}.")
[docs]
@dataclass(frozen=True)
class Experience:
"""Bind an experience dataset to its actuarial column roles.
``Experience`` is the recommended entry point for repeated experience-analysis
workflows. It stores common column roles once and delegates calculations to
the package's free functions. The object is immutable: methods return
DataFrames or new ``Experience`` objects rather than changing stored data in
place.
Bind ``count`` (a claim or service count) to unlock the frequency-severity views:
:meth:`frequency_severity` and :meth:`decompose_trend` (frequency x severity,
optionally x mix). :meth:`fit_trend` regresses a developed trend on the bound history.
**Grain matters.** ``Experience`` aggregates by *summing* the bound columns, so it
expects rows at the grain of the exposure unit -- one row per member-month, with
``member_months`` = 1 (or the eligible fraction). If your data is *long* (one row per
service line, so the same member-month repeats across several rows), summing the
exposure column overcounts it, and every per-exposure figure -- PMPM, frequency, the
loss-ratio denominator -- is wrong by the number of rows per member-month. ``Experience``
does not detect this: it has no member key, so it cannot tell a long frame from a wide
one. For long or multi-table warehouse data, either aggregate to member-month grain
first, or use :meth:`bind`, which sources exposure from a correctly-grained table (e.g.
eligibility) via :class:`~actuarialpy.Count` and never sums a repeated column.
"""
data: pd.DataFrame
expense: str | list[str]
revenue: str | list[str]
exposure: str | list[str] | None = None
date: str | None = None
profile: str | None = None
count: str | None = None
copy: bool = False
def __post_init__(self) -> None:
object.__setattr__(self, "expense", as_list(self.expense))
object.__setattr__(self, "revenue", as_list(self.revenue))
object.__setattr__(self, "exposure", as_list(self.exposure))
if self.copy:
object.__setattr__(self, "data", self.data.copy())
required = as_list(self.expense) + as_list(self.revenue) + as_list(self.exposure)
if self.date is not None:
required.append(self.date)
if self.count is not None:
required.append(self.count)
validate_columns(self.data, required)
_validate_exposure_names(as_list(self.exposure))
_validate_numeric_columns(self.data, as_list(self.expense), role="Expense")
_validate_numeric_columns(self.data, as_list(self.revenue), role="Revenue")
_validate_numeric_columns(self.data, as_list(self.exposure), role="Exposure")
if self.count is not None:
_validate_numeric_columns(self.data, [self.count], role="Count")
[docs]
def with_roles(
self,
*,
data: pd.DataFrame | None = None,
expense: str | list[str] | None = None,
revenue: str | list[str] | None = None,
exposure: str | list[str] | None = None,
date: str | None = None,
profile: str | None = None,
count: str | None = None,
copy: bool | None = None,
) -> "Experience":
"""Return a new ``Experience`` object with updated data or roles."""
return replace(
self,
data=self.data if data is None else data,
expense=self.expense if expense is None else expense,
revenue=self.revenue if revenue is None else revenue,
exposure=self.exposure if exposure is None else exposure,
date=self.date if date is None else date,
profile=self.profile if profile is None else profile,
count=self.count if count is None else count,
copy=self.copy if copy is None else copy,
)
[docs]
def filter(
self,
mask: Any | None = None,
*,
query: str | None = None,
copy: bool = True,
) -> "Experience":
"""Return a new ``Experience`` object over a filtered dataset.
Use either a boolean mask or a pandas query string.
"""
if (mask is None) == (query is None):
raise ValueError("Pass exactly one of mask or query.")
if query is not None:
data = self.data.query(query)
else:
data = cast("pd.DataFrame", self.data.loc[mask])
if copy:
data = data.copy()
return self.with_roles(data=data, copy=False)
[docs]
def deseasonalize(
self,
factors: pd.Series,
*,
columns: str | list[str] | None = None,
freq: str = "M",
by: str | list[str] | None = None,
date_col: str | None = None,
) -> "Experience":
"""Return a new ``Experience`` with the seasonal pattern divided out.
Each selected column is divided by its row's seasonal factor (as produced by
:func:`seasonality_factors`), in place under the same name, so every
downstream view -- :meth:`trend`, :meth:`rolling`, :meth:`by`, and the rest --
then operates on the deseasonalized series. By default the expense
(loss / claims) columns are adjusted; pass ``columns`` to choose others. Only
the numerator is touched: exposure is left alone, so a deseasonalized PMPM is
simply deseasonalized claims over unchanged member months.
``factors`` may be a flat Series (one pattern) or a tidy per-segment table from
:func:`seasonality_factors_by`; with the latter, pass ``by`` naming the grouping
column(s) to join on group plus season. Estimate factors on the broader pool,
not on this object's own (often thin) data. To put the pattern back, apply
:func:`apply_seasonality` to ``.data``.
"""
resolved_date = self._resolve_date_col(date_col)
cols = as_list(columns) if columns is not None else as_list(self.expense)
if not cols:
raise ValueError("No columns to deseasonalize; pass columns=... or bind an expense role.")
validate_columns(self.data, cols + [resolved_date] + as_list(by))
data = self.data.copy()
for col in cols:
data = _deseasonalize(
data, factors, date_col=resolved_date, value_col=col, freq=freq, by=by, out_col=col, copy=False
)
return self.with_roles(data=data, copy=False)
[docs]
def complete(
self,
factors: pd.Series,
*,
valuation_date: Any = None,
columns: str | list[str] | None = None,
development_col: str | None = None,
by: str | list[str] | None = None,
date_col: str | None = None,
) -> "Experience":
"""Return a new ``Experience`` with paid amounts developed to ultimate.
Grosses the expense (loss / claims) columns up to estimated ultimate in place
under the same names -- ``completed = paid / completion_factor`` -- so downstream
views (:meth:`trend`, :meth:`rolling`, :meth:`by`, ...) then run on the completed
series. Each row's development period is
``development_months(date, valuation_date)`` (the convention
:func:`make_completion_triangle` uses), or an explicit ``development_col``. The
join is by value, so the frame's index is irrelevant; rows past the triangle's
last development period are taken as fully complete, and only recent, immature
months actually move.
``factors`` may be a flat Series (one pattern, from :func:`completion_factors`)
or a tidy per-segment table from :func:`completion_factors_by`; with the latter,
pass ``by`` naming the grouping column(s) to join on group plus development
period. Only the numerator is developed -- exposure is left untouched. This
applies to the latest-diagonal shape (one row per incurred month, ``claims``
paid-to-date as of ``valuation_date``); a frame already on an ultimate basis must
not be completed again.
"""
cols = as_list(columns) if columns is not None else as_list(self.expense)
if not cols:
raise ValueError("No columns to complete; pass columns=... or bind an expense role.")
if development_col is None:
resolved_date = self._resolve_date_col(date_col)
validate_columns(self.data, cols + [resolved_date] + as_list(by))
else:
resolved_date = None
validate_columns(self.data, cols + [development_col] + as_list(by))
data = self.data.copy()
for col in cols:
data = _apply_completion(
data,
factors,
value_col=col,
date_col=resolved_date,
valuation_date=valuation_date,
development_col=development_col,
by=by,
out_col=col,
copy=False,
)
return self.with_roles(data=data, copy=False)
[docs]
def adjust(
self,
factors: float | int | pd.Series | pd.DataFrame,
*,
on: str | list[str] | None = None,
columns: str | list[str] | None = None,
by: str | list[str] | None = None,
how: str = "multiply",
factor_col: str = "factor",
audit_col: str | None = None,
default: float | None = None,
) -> "Experience":
"""Return a new ``Experience`` with an expense column restated by a factor.
The general counterpart to :meth:`complete` and :meth:`deseasonalize`: joins a
factor by the key ``on`` (a column already in the frame, optionally within ``by``
segments) and multiplies -- or, with ``how="divide"``, divides -- the selected
column(s) in place under the same name, so every downstream view composes on the
restated series. ``factors`` is a scalar (one factor for all rows), a Series
indexed by ``on``, or a tidy DataFrame keyed by ``by + on``.
This is the spine of experience-period restatement -- trend, benefit / area /
demographic relativities, network discounts -- where the methodology is supplied
as the factors rather than encoded here. Chain freely
(``exp.complete(...).adjust(trend).adjust(area, on="region")``); with ``audit_col``
the cumulative restatement multiplier is carried across the chain, one value per
row, for a reviewable audit trail. An absent key surfaces as ``NaN`` unless
``default`` is given (``default=1.0`` to mean "no adjustment for this key").
"""
cols = as_list(columns) if columns is not None else as_list(self.expense)
if not cols:
raise ValueError("No columns to adjust; pass columns=... or bind an expense role.")
validate_columns(self.data, cols + as_list(on) + as_list(by))
data = self.data.copy()
for col in cols:
data = _adjust(
data,
factors,
value_col=col,
on=on,
by=by,
how=how,
factor_col=factor_col,
out_col=col,
audit_col=audit_col,
default=default,
copy=False,
)
return self.with_roles(data=data, copy=False)
[docs]
def by(self, groupby: str | list[str] | None = None, **kwargs: Any) -> pd.DataFrame:
"""Summarize experience by optional grouping columns."""
return summarize_experience(
self.data,
groupby=groupby,
expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
profile=kwargs.pop("profile", self.profile),
**kwargs,
)
[docs]
def views(self, views: dict[str, str | Iterable[str] | None], **kwargs: Any) -> dict[str, pd.DataFrame]:
"""Create several named grouped experience views."""
return summarize_views(
self.data,
views=views,
expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
profile=kwargs.pop("profile", self.profile),
**kwargs,
)
[docs]
def rolling(
self,
window: int = 12,
*,
groupby: str | list[str] | None = None,
date_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Create a rolling-period experience summary."""
resolved_date = self._resolve_date_col(date_col)
return rolling_summary(
self.data,
date_col=resolved_date,
window=window,
groupby=groupby,
expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
**kwargs,
)
[docs]
def trend(
self,
*,
amount_col: str | None = None,
exposure_col: str | None = None,
groupby: str | list[str] | None = None,
date_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Compare amount or per-exposure experience between two periods."""
data, resolved_amount = self._data_with_amount(amount_col)
# Use the bound date column only for date-range comparisons. If the
# caller supplies period_col/prior_period/current_period, passing the
# bound date column would create two comparison modes and incorrectly
# raise an error.
resolved_date = date_col if date_col is not None else self.date
if "period_col" in kwargs and date_col is None:
resolved_date = None
return trend_summary(
data,
amount_col=resolved_amount,
exposure_col=exposure_col or self._single_exposure_or_none(),
groupby=groupby,
date_col=resolved_date,
**kwargs,
)
[docs]
def frequency_severity(
self,
*,
count_col: str | None = None,
loss_col: str | None = None,
exposure_col: str | None = None,
groupby: str | list[str] | None = None,
) -> pd.DataFrame:
"""Per-group claim frequency, severity, and per-exposure loss (see ``frequency_severity_summary``).
Uses the bound ``count``, ``expense`` (as the loss), and ``exposure`` roles, so the
columns are specified once on the object. The identity ``loss_per_exposure == frequency *
severity`` holds for every row.
"""
data, resolved_loss = self._data_with_amount(loss_col)
return frequency_severity_summary(
data,
count_col=self._resolve_count(count_col),
loss_col=resolved_loss,
exposure_col=self._resolve_exposure(exposure_col),
groupby=groupby,
)
[docs]
def decompose_trend(
self,
*,
count_col: str | None = None,
loss_col: str | None = None,
exposure_col: str | None = None,
mix_by: str | Iterable[str] | None = None,
groupby: str | list[str] | None = None,
period_col: str | None = None,
prior_period: Any = None,
current_period: Any = None,
date_col: str | None = None,
prior_start: Any = None,
prior_end: Any = None,
current_start: Any = None,
current_end: Any = None,
prior_filter: Any = None,
current_filter: Any = None,
) -> pd.DataFrame:
"""Decompose the per-exposure loss trend between two periods of the bound data.
Splits the bound frame into prior and current with the same comparison modes as
:meth:`trend` -- ``period_col`` with ``prior_period`` / ``current_period``, a
``date_col`` with prior/current ranges (the bound ``date`` is used when no
``date_col`` is passed), or explicit ``prior_filter`` / ``current_filter`` masks --
then decomposes the change via :func:`decompose_per_exposure_trend`, using the bound
``count``, ``expense`` (as the loss), and ``exposure`` roles. Pass ``mix_by`` to add
the third LMDI mix term; ``groupby`` reports one decomposition per group.
"""
resolved_count = self._resolve_count(count_col)
resolved_exposure = self._resolve_exposure(exposure_col)
data, resolved_loss = self._data_with_amount(loss_col)
date_mode = any(v is not None for v in (date_col, prior_start, prior_end, current_start, current_end))
resolved_date = (date_col if date_col is not None else self.date) if date_mode else None
prior_mask, current_mask, _ = _comparison_masks(
data,
period_col=period_col,
prior_period=prior_period,
current_period=current_period,
date_col=resolved_date,
prior_start=prior_start,
prior_end=prior_end,
current_start=current_start,
current_end=current_end,
prior_filter=prior_filter,
current_filter=current_filter,
)
return decompose_per_exposure_trend(
data.loc[prior_mask],
data.loc[current_mask],
count_col=resolved_count,
loss_col=resolved_loss,
exposure_col=resolved_exposure,
on=groupby,
mix_by=mix_by,
)
[docs]
def fit_trend(
self,
*,
value_col: str | None = None,
exposure_col: str | None = None,
date_col: str | None = None,
freq: str = "M",
min_periods: int = 3,
confidence: float = 0.95,
) -> TrendFit:
"""Fit an exponential trend to the bound experience by log-linear regression.
Defaults to the bound ``expense`` (claims) over the bound ``exposure`` -- the PMPM
trend -- across the bound ``date``; pass ``value_col`` / ``exposure_col`` to
override, or leave the exposure unbound to trend the raw amount. Returns a
``TrendFit`` (see :func:`fit_trend`). Run on completed, deseasonalized history.
"""
data, resolved_value = self._data_with_amount(value_col)
resolved_exposure = exposure_col if exposure_col is not None else self._single_exposure_or_none()
return _fit_trend(
data,
value_col=resolved_value,
date_col=self._resolve_date_col(date_col),
exposure_col=resolved_exposure,
freq=freq,
min_periods=min_periods,
confidence=confidence,
)
[docs]
def components(
self,
component_cols: str | list[str],
*,
exposure_col: str | None = None,
groupby: str | list[str] | None = None,
date_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Explain component drivers between two periods."""
# Use the bound date column only for date-range comparisons. If the
# caller supplies period_col/prior_period/current_period, passing the
# bound date column would create two comparison modes and incorrectly
# raise an error.
resolved_date = date_col if date_col is not None else self.date
if "period_col" in kwargs and date_col is None:
resolved_date = None
return component_driver_analysis(
self.data,
component_cols=component_cols,
exposure_col=exposure_col or self._single_exposure_or_none(),
groupby=groupby,
date_col=resolved_date,
**kwargs,
)
[docs]
def component_summary(
self,
component_cols: str | list[str],
*,
groupby: str | list[str] | None = None,
exposure_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Summarize component amounts, per-exposure values, and shares."""
return summarize_components(
self.data,
groupby=groupby,
component_cols=component_cols,
exposure_col=exposure_col or self._single_exposure_or_none(),
**kwargs,
)
[docs]
def actual_vs_expected(
self,
expected: str | list[str],
*,
actual: str | list[str] | None = None,
groupby: str | list[str] | None = None,
exposure: str | list[str] | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Summarize actual-versus-expected experience.
If ``actual`` is omitted, the object's bound expense columns are used.
"""
return summarize_actual_vs_expected(
self.data,
groupby=groupby,
actual_cols=self.expense if actual is None else actual,
expected_cols=expected,
exposure_cols=self.exposure if exposure is None else exposure,
**kwargs,
)
[docs]
def claimants(
self,
claimant_col: str,
*,
amount_cols: str | list[str] | None = None,
groupby: str | list[str] | None = None,
exposure_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Aggregate the experience to claimant/member/risk level."""
return summarize_claimants(
self.data,
claimant_col=claimant_col,
amount_cols=self.expense if amount_cols is None else amount_cols,
groupby=groupby,
exposure_col=exposure_col,
**kwargs,
)
[docs]
def top_claimants(
self,
claimant_col: str,
*,
amount_cols: str | list[str] | None = None,
amount_col: str | None = None,
groupby: str | list[str] | None = None,
n: int = 25,
**kwargs: Any,
) -> pd.DataFrame:
"""Return top claimants by amount."""
return top_claimants(
self.data,
claimant_col=claimant_col,
amount_cols=self.expense if amount_cols is None and amount_col is None else amount_cols,
amount_col=amount_col,
groupby=groupby,
n=n,
**kwargs,
)
[docs]
def claimant_concentration(
self,
claimant_col: str,
*,
amount_cols: str | list[str] | None = None,
groupby: str | list[str] | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Summarize how concentrated experience is among top claimants."""
claimant_summary = summarize_claimants(
self.data,
claimant_col=claimant_col,
amount_cols=self.expense if amount_cols is None else amount_cols,
groupby=groupby,
)
return claim_concentration(claimant_summary, groupby=groupby, **kwargs)
[docs]
def cohort(
self,
*,
entity_col: str,
start_date_col: str,
duration_months: int = 12,
groupby: str | list[str] | None = None,
date_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Summarize each entity's first N months or cohort-duration window."""
return cohort_summary(
self.data,
entity_col=entity_col,
date_col=self._resolve_date_col(date_col),
start_date_col=start_date_col,
duration_months=duration_months,
groupby=groupby,
expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
profile=kwargs.pop("profile", self.profile),
**kwargs,
)
[docs]
def duration(
self,
*,
entity_col: str,
start_date_col: str,
max_duration_month: int | None = None,
date_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Summarize experience by duration month since entity start."""
return duration_summary(
self.data,
entity_col=entity_col,
date_col=self._resolve_date_col(date_col),
start_date_col=start_date_col,
expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
max_duration_month=max_duration_month,
**kwargs,
)
[docs]
def by_status(self, status_col: str, *, entity_col: str | None = None, **kwargs: Any) -> pd.DataFrame:
"""Summarize experience by a status column."""
return status_summary(
self.data,
status_col=status_col,
entity_col=entity_col,
expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
profile=kwargs.pop("profile", self.profile),
**kwargs,
)
[docs]
def with_status(
self,
*,
effective_col: str,
as_of: Any,
termination_col: str | None = None,
first_year_months: int = 12,
status_col: str = "status",
labels: dict[str, str] | None = None,
) -> "Experience":
"""Return a new ``Experience`` with a derived lifecycle status column.
Derives active / first-year / termed from effective and termination dates
as of a reference date (see :func:`actuarialpy.derive_status`). Summarize
the result with :meth:`by_status`.
"""
data = derive_status(
self.data,
effective_col=effective_col,
as_of=as_of,
termination_col=termination_col,
first_year_months=first_year_months,
status_col=status_col,
labels=labels,
)
return self.with_roles(data=data, copy=False)
[docs]
def by_band(
self,
value_col: str,
bands: Any,
*,
labels: Any = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Summarize experience by a size band on ``value_col`` (see ``summarize_by_band``)."""
return summarize_by_band(
self.data,
value_col,
bands,
labels=labels,
expense_cols=kwargs.pop("expense_cols", kwargs.pop("expense", self.expense)),
revenue_cols=kwargs.pop("revenue_cols", kwargs.pop("revenue", self.revenue)),
exposure_cols=kwargs.pop("exposure_cols", kwargs.pop("exposure", self.exposure)),
profile=kwargs.pop("profile", self.profile),
**kwargs,
)
[docs]
def margin(
self,
groupby: str | list[str] | None = None,
*,
margin_col: str = "margin",
ratio_col: str = "margin_ratio",
per_exposure_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Underwriting margin (revenue net of expense) by optional grouping.
Aggregates the bound expense and revenue roles with :meth:`by`, then adds
the margin (``total_revenue - total_expense``), the margin ratio, and an
optional per-exposure margin.
"""
summary = self.by(groupby, **kwargs)
summary[margin_col] = summary["total_revenue"] - summary["total_expense"]
summary[ratio_col] = safe_divide(summary[margin_col], summary["total_revenue"])
if per_exposure_col is not None:
exposure = self._single_exposure_or_none()
if exposure is None:
raise ValueError("A single bound exposure is required for per_exposure_col.")
summary[per_exposure_col] = per_exposure(summary[margin_col], summary[exposure])
return summary
[docs]
def credibility_weighted(
self,
groupby: str | list[str],
*,
z: Any,
metric: str = "loss_ratio",
complement: float | None = None,
out_col: str | None = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Blend each group's ``metric`` with a complement at credibility ``z``.
Computes the grouped summary (:meth:`by`), then blends ``metric`` toward
``complement`` using ``z`` (see
:func:`actuarialpy.credibility_weighted_estimate`). ``z`` may be a scalar
or values aligned to the grouped rows. When ``complement`` is omitted the
book-level value of ``metric`` is used as the complement of credibility.
"""
summary = self.by(groupby, **kwargs)
if metric not in summary.columns:
raise ValueError(f"metric '{metric}' is not in the summary columns: {list(summary.columns)}")
if complement is None:
complement = self.by(**kwargs)[metric].iloc[0]
name = out_col or f"credibility_weighted_{metric}"
summary[name] = credibility_weighted_estimate(summary[metric], complement, z)
return summary
[docs]
def pool_claimants(
self,
claimant_col: str,
pooling_point: float,
*,
amount_cols: str | list[str] | None = None,
groupby: str | list[str] | None = None,
amount_name: str = "total_expense",
**kwargs: Any,
) -> pd.DataFrame:
"""Aggregate to claimant level and split each claimant into pooled/excess.
Summarizes the experience to claimant grain (:meth:`claimants`) and caps
each claimant's total at ``pooling_point`` (see
:func:`actuarialpy.pool_losses`), returning pooled and excess columns for
capped experience and the excess hand-off to tail modeling.
"""
claimant_totals = summarize_claimants(
self.data,
claimant_col=claimant_col,
amount_cols=self.expense if amount_cols is None else amount_cols,
groupby=groupby,
amount_name=amount_name,
)
return pool_losses(claimant_totals, amount_name, pooling_point, **kwargs)
def _resolve_date_col(self, date_col: str | None) -> str:
resolved = date_col or self.date
if resolved is None:
raise ValueError("A date column is required. Pass date=... to Experience or date_col=... to this method.")
return resolved
def _resolve_count(self, count_col: str | None) -> str:
resolved = count_col or self.count
if resolved is None:
raise ValueError(
"A count column is required. Pass count=... to Experience or count_col=... to this method."
)
validate_columns(self.data, [resolved])
return resolved
def _resolve_exposure(self, exposure_col: str | None) -> str:
if exposure_col is not None:
validate_columns(self.data, [exposure_col])
return exposure_col
resolved = self._single_exposure_or_none()
if resolved is None:
raise ValueError(
"An exposure column is required for this method. Pass exposure=... to Experience "
"or exposure_col=... to this method."
)
return resolved
def _single_exposure_or_none(self) -> str | None:
exposures = as_list(self.exposure)
if not exposures:
return None
if len(exposures) > 1:
raise ValueError("Multiple exposures are bound. Pass exposure_col explicitly for this method.")
return exposures[0]
def _data_with_amount(self, amount_col: str | None) -> tuple[pd.DataFrame, str]:
if amount_col is not None:
validate_columns(self.data, [amount_col])
return self.data, amount_col
expenses = as_list(self.expense)
if len(expenses) == 1:
return self.data, expenses[0]
temp = self.data.copy()
amount_name = "_actuarialpy_total_expense"
temp[amount_name] = sum_columns(temp, expenses)
return temp, amount_name