Source code for risksim.contracts

from __future__ import annotations

from dataclasses import dataclass
from typing import Sequence

import numpy as np

from ._validation import as_1d_float_array


[docs] @dataclass(frozen=True, slots=True) class AggregateLayer: """ Aggregate annual layer applied to simulated aggregate losses. For aggregate annual loss S, ceded loss is: C = share * min((S - attachment)+, limit) with no cap if limit is None. """ attachment: float = 0.0 limit: float | None = None share: float = 1.0 name: str | None = None def __post_init__(self) -> None: if self.attachment < 0: raise ValueError("attachment must be nonnegative") if self.limit is not None and self.limit < 0: raise ValueError("limit must be nonnegative or None") if not (0.0 <= self.share <= 1.0): raise ValueError("share must be between 0 and 1") def ceded(self, losses: np.ndarray | list[float]) -> np.ndarray: gross = as_1d_float_array(losses) recoverable = np.maximum(gross - self.attachment, 0.0) if self.limit is not None: recoverable = np.minimum(recoverable, self.limit) return self.share * recoverable def retained(self, losses: np.ndarray | list[float]) -> np.ndarray: gross = as_1d_float_array(losses) return gross - self.ceded(gross)
[docs] def attachment_probability(self, losses: np.ndarray | list[float]) -> float: """Probability that a loss reaches the layer, i.e. P(loss > attachment). This depends only on the attachment point, not on ``share`` or ``limit``, so it stays correct under degenerate parameters (e.g. ``share=0``). """ gross = as_1d_float_array(losses) return float(np.mean(gross > self.attachment))
def exhaustion_probability(self, losses: np.ndarray | list[float]) -> float | None: if self.limit is None: return None gross = as_1d_float_array(losses) return float(np.mean(gross >= self.attachment + self.limit))
[docs] class ContractProgram: """ Collection of aggregate layers applied to the same gross loss. This first version assumes the layers are intended to work together without overlap. For a standard non-overlapping tower, total ceded loss is the row-wise sum of ceded loss by layer. """ def __init__( self, layers: Sequence[AggregateLayer], name: str = "contract_program", ) -> None: if not layers: raise ValueError("layers must contain at least one AggregateLayer") self.layers = tuple(layers) self.name = name def layer_names(self) -> list[str]: names: list[str] = [] for i, layer in enumerate(self.layers): names.append(layer.name or f"layer_{i}") return names def ceded_by_layer(self, losses: np.ndarray | list[float]) -> np.ndarray: gross = as_1d_float_array(losses) cols = [layer.ceded(gross) for layer in self.layers] return np.column_stack(cols) def ceded(self, losses: np.ndarray | list[float]) -> np.ndarray: by_layer = self.ceded_by_layer(losses) return np.sum(by_layer, axis=1) def retained(self, losses: np.ndarray | list[float]) -> np.ndarray: gross = as_1d_float_array(losses) return gross - self.ceded(gross)
[docs] def apply_contract( losses: np.ndarray | list[float], contract: AggregateLayer | ContractProgram, ) -> tuple[np.ndarray, np.ndarray]: """ Return (ceded, retained) arrays for a single aggregate layer or a multi-layer contract program. """ gross = as_1d_float_array(losses) ceded = contract.ceded(gross) retained = gross - ceded return ceded, retained