Source code for finpricing.model.bond_curve_solver

from typing import Union, List
import datetime
import math
import scipy
from multiprocessing import Pool
import os
from dataclasses import dataclass
from ..utils.date import Date
from .fixed_bond_pricer import FixedBondPricer

[docs] class BondCurveAnalyticsHelper: def __init__(self, bonds) -> None: if isinstance(bonds, list): self.bonds = bonds else: raise TypeError('bonds must be a list of bonds.') self.bond_pricers = [FixedBondPricer(bond) for bond in self.bonds] self.n_underlyings = len(self.bonds) self._dirty_prices = None self._discount_curves = None self._survival_curves = None self._recovery_rates = None self._settlement_dates = None self._valuation_date = None @property def recovery_rates(self): if self._recovery_rates is None: return [0.4] * len(self.bonds) else: return self._recovery_rates @recovery_rates.setter def recovery_rates(self, values: List): if values is None: return if isinstance(values, list) and len(values) == len(self.bonds): self._recovery_rates = values elif isinstance(values, float): self._recovery_rates = [values] * len(self.bonds) else: raise TypeError('recovery_rates must be a list of floats.') @property def dirty_prices(self): if self._dirty_prices is None: raise ValueError('dirty_prices must be set before use.') else: return self._dirty_prices @dirty_prices.setter def dirty_prices(self, values: List): if values is None: return if isinstance(values, list) and len(values) == len(self.bonds): self._dirty_prices = values else: raise TypeError('dirty_prices must be a list of floats.') @property def discount_curves(self): if self._discount_curves is None: return [None] * len(self.bonds) else: return self._discount_curves @discount_curves.setter def discount_curves(self, curve): if curve is None: return if isinstance(curve, list): raise NotImplementedError('list of discount curves not implemented yet.') else: self._discount_curves = [curve] * len(self.bonds) @property def survival_curves(self): if self._survival_curves is None: return [None] * len(self.bonds) else: return self._survival_curves @survival_curves.setter def survival_curves(self, curve): if curve is None: return if isinstance(curve, list): raise NotImplementedError('list of survival curves not implemented yet.') else: self._survival_curves = [curve] * len(self.bonds) @property def settlement_dates(self): if self._settlement_dates is None: return [None] * len(self.bonds) else: return self._settlement_dates @settlement_dates.setter def settlement_dates(self, values: List): if isinstance(values, list) and len(values) == len(self.bonds): self._settlement_dates = values elif isinstance(values, datetime.date) or isinstance(values, Date): self._settlement_dates = [values] * len(self.bonds) else: raise TypeError('settlement_dates must be a list of dates.') @property def valuation_date(self): if self._valuation_date is None: raise ValueError('valuation_date must be set before use.') else: return self._valuation_date @valuation_date.setter def valuation_date(self, value): if value is None: return if isinstance(value, datetime.date) or isinstance(value, Date): self._valuation_date = value else: raise TypeError('valuation_date must be a date.') @property def maturity_dates(self): """maturity dates of all bonds in the portfolio""" return [bond.maturity_date for bond in self.bonds] @property def maturity_span(self): """maximum maturity minus minimum maturity in years""" return (max(self.maturity_dates) - min(self.maturity_dates)) / 365
[docs] def setup(self, valuation_date: Union[datetime.date, Date]=None, dirty_prices: List[float]=None, discount_curves=None, survival_curves=None, recovery_rates=None, settlement_dates=None): """setup the helper in one go""" self.valuation_date = valuation_date self.dirty_prices = dirty_prices self.discount_curves = discount_curves self.survival_curves = survival_curves self.recovery_rates = recovery_rates self.settlement_dates = settlement_dates
[docs] def get_bond_bases(self, valuation_date: Union[datetime.date, Date]=None, dirty_prices: List[float]=None, survival_curves=None, basis_type: str='AdditiveZeroRates'): if valuation_date is None: valuation_date = self.valuation_date if dirty_prices is None: dirty_prices = self.dirty_prices if survival_curves is None: survival_curves = self.survival_curves assert isinstance(dirty_prices, list), \ "dirty_prices must be a list of floats." assert isinstance(survival_curves, list), \ "survival_curves must be a list of survival curves." def f(i): return self.bond_pricers[i].solve_basis( valuation_date = valuation_date, dirty_price = dirty_prices[i], survival_curve = survival_curves[i], discount_curve = self.discount_curves[i], recovery_rate = self.recovery_rates[i], settlement_date = self.settlement_dates[i], basis_type = basis_type ) return [f(i) for i in range(len(dirty_prices))]
[docs] @dataclass class PenaltyParameter: penalty_ridge_tuning: float = 0.0001 penalize_sample_size: bool = True penalize_maturity_span: bool = True penalize_inverted_curve: bool = True penalty_inverted_tuning: float = 0.2 penalty_inverted_threshold: float = 0.01 median_dummy_curve_level: float = 0.0097304346928168781
[docs] class BondCurveSolver: def __init__(self, bondAnalyticsHelper: BondCurveAnalyticsHelper, initial_params=None, weights=None, penalty_params: PenaltyParameter=None) -> None: self.helper = bondAnalyticsHelper if initial_params is None: self.initial_params = [0.0, 0.0, 0.0] if penalty_params is None: self.penalty_params = PenaltyParameter() if weights is None: self.weights = self.get_euqal_weights() self.survival_curve_generator = self.getSurvivalCurveGenerator(self.helper)
[docs] def getSurvivalCurveGenerator(self, bondAnalyticsHelper: BondCurveAnalyticsHelper): """return a survival curve generator that can generate survival curves from parameters This is actually not a true generator, but a survival curve that has the method to recreate\ a new survival curve with the same anchor date and pivot dates but different parameters. """ survival_curves_set = set(bondAnalyticsHelper.survival_curves) if len(survival_curves_set) != 1: raise ValueError('Survival curves must be the same for all bonds to use bond curve solver.') return survival_curves_set.pop()
[docs] def get_euqal_weights(self): """return a list of equal weights for all bonds""" return [ 1. / self.helper.n_underlyings ] * self.helper.n_underlyings
[docs] @staticmethod def welsch_loss(x): c = math.sqrt(2) * 0.01 loss = 0.5 * c ** 2 * (1 - math.exp(-x ** 2 / c ** 2)) return loss
[docs] def get_penalty(self, params: List[float]): assert len(params) == 3, "params must be a list of length 3." tuning_scalar = self.penalty_params.penalty_ridge_tuning if self.penalty_params.penalize_sample_size: tuning_scalar /= len(self.helper.bonds) if self.penalty_params.penalize_maturity_span: tuning_scalar /= self.helper.maturity_span penalty = tuning_scalar * sum([param ** 2 for param in params[1:]]) # inverted curve penalty if self.penalty_params.penalize_inverted_curve: tuning_scalar = self._get_tuning_scalar() derivative_at_zero = self._get_hazard_rate_derivative_at_zero(params) penalty += tuning_scalar * max(0., -1 * derivative_at_zero) return penalty
[docs] def _get_tuning_scalar(self): tuning_factor = self.penalty_params.penalty_inverted_tuning median_dummy_curve_level = self.penalty_params.median_dummy_curve_level threshold = self.penalty_params.penalty_inverted_threshold tuning_scalar = tuning_factor / math.sinh(median_dummy_curve_level / threshold) return tuning_scalar
[docs] def _get_hazard_rate_derivative_at_zero(self, params: List[float]): pivots = self.survival_curve_generator.getSurvivalCurve(params).pivots derivative_at_zero = -1 * sum([param / (2 * pivot ** 2) for param, pivot in zip(params[1:], pivots)]) return derivative_at_zero
[docs] def solve(self, dirty_prices: List[float]=None, weights: List[float]=None, params: List[float]=None): """solve the best parameters for the survival curve that minimizes the weighted residuals""" if dirty_prices is None: dirty_prices = self.helper.dirty_prices if weights is None: weights = self.weights if params is None: params = self.initial_params res = scipy.optimize.leastsq(self.get_weighted_residuals_and_penalty, params, args = (dirty_prices, weights), xtol = 1e-8, ftol = 1e-8, full_output=False) if res[1] not in [1, 2, 3, 4]: raise RuntimeError('Optimization failed for Bond Basis Solver.') return res[0]
[docs] def get_weighted_residuals_and_penalty(self, params: List[float], dirty_prices: List[float], weights: List[float], valuation_date: Union[datetime.date, Date]=None, basis_type: str='AdditiveZeroRates'): new_survival_curve = self.survival_curve_generator.getSurvivalCurve(params) bases = self.helper.get_bond_bases(valuation_date=valuation_date, dirty_prices=dirty_prices, survival_curves=[new_survival_curve] * len(dirty_prices), basis_type=basis_type) assert len(dirty_prices) == len(weights), "Input dirty_prices and weights must have the same length." loss = [self.welsch_loss(basis) for basis in bases] weighted_loss = [ math.sqrt(weight * loss) for weight, loss in zip(weights, loss) ] regularization_penalty = weighted_loss + [ math.sqrt( self.get_penalty( params ) ) ] return regularization_penalty