Source code for finpricing.model.utils.bond_pricing_utils

from ...utils import *
import datetime
from typing import Union
import math

MINIMUM_DENOMINATOR = 1e-12


[docs] def hazard_rate_from_probs( start_p, end_p, start_date, end_date, day_count_type=DayCountTypes.ACT_ACT_ISDA ): """ Calculate the constant hazard rate given the survival probabilities at two time points. Args: start_p: Survival probability at start date end_p: Survival probability at end date start_date: Start date. end_date: End date. Returns: The hazard rate. """ day_counter = DayCount(day_count_type) delta_time = day_counter.year_fraction(start_date, end_date) return (1 / delta_time) * math.log(start_p / end_p)
# def ir_from_discount_factors( # start_B, end_B, start_date, end_date, day_count_type=DayCountTypes.ACT_ACT_ISDA # ): # day_counter = DayCount(day_count_type) # delta_time = day_counter.year_fraction(start_date, end_date) # return (1 / delta_time) * math.log(start_B / end_B)
[docs] def principal_integral( N, R, valuation_date: Union[datetime.date, Date], maturity_date: Union[datetime.date, Date], granularity_in_days: int, survival_curve, discount_curve, ): """ Calculate the principal integral. Args: N: Total notional. R: Recovery rate. end_date: End date. granularity_in_days: The granularity parameter in days. survival_curve: Function that returns the survival probability at a given date. discount_curve: Function that returns the bond price at a given date. Returns: The principal integral value. """ # Create the partition based on end_date and granularity_in_days valuation_date = Date.convert_from_datetime(valuation_date) maturity_date = Date.convert_from_datetime(maturity_date) partition = [] current_date = valuation_date while current_date <= maturity_date: partition.append(current_date) current_date = current_date.add_days(granularity_in_days) if partition[-1] != maturity_date: partition.append(maturity_date) G = len(partition) - 1 # number of intervals in the partition sum_PV = 0 for g in range(1, G + 1): tau_g_minus_1 = partition[g - 1] tau_g = partition[g] log_ratio_p = math.log( survival_curve.survival(tau_g_minus_1) / survival_curve.survival(tau_g) ) log_ratio_B = math.log( discount_curve.discount(tau_g_minus_1) / discount_curve.discount(tau_g) ) coefficient = log_ratio_p / (log_ratio_p + log_ratio_B + MINIMUM_DENOMINATOR) sum_PV += coefficient * ( survival_curve.survival(tau_g_minus_1) * discount_curve.discount(tau_g_minus_1) - survival_curve.survival(tau_g) * discount_curve.discount(tau_g) ) return N * R * sum_PV
[docs] def accrual_integral( survival_curve, discount_curve, accrual_start_date, accrual_end_date, granularity_in_days, R, day_count_type=DayCountTypes.ACT_ACT_ISDA ): """calculate the accrual of a coupon leg Args: start_date: start date of the calculation period granularity_in_days: granularity of the partition in days R: recovery rate survival_curve: survival curve discount_curve: discount curve accrual_start_date: accrual start date, the lower bound of the integral accrual_end_date: accrual end date, the upper bound of the integral Returns: The accrual value """ # start_date = Date.convert_from_datetime(start_date) accrual_start_date = Date.convert_from_datetime(accrual_start_date) accrual_end_date = Date.convert_from_datetime(accrual_end_date) day_counter = DayCount(day_count_type) partition = [] current_date = max(accrual_start_date, discount_curve.anchor_date) prev_factor = discount_curve.discount(current_date) prev_prob = survival_curve.survival(current_date) while current_date <= accrual_end_date: partition.append(current_date) current_date = current_date.add_days(granularity_in_days) if partition[-1] != accrual_end_date: partition.append(accrual_end_date) G = len(partition) - 1 # number of intervals in the partition sum_accrual = 0. for g in range(1, G + 1): partition_start = partition[g - 1] partition_end = partition[g] partition_length = day_counter.days_between(partition_start, partition_end)[0] next_factor = discount_curve.discount(partition_end) next_prob = survival_curve.survival(partition_end) h = math.log(prev_prob / next_prob) / partition_length r = math.log(prev_factor / next_factor) / partition_length h_over_r_plus_h = h / (r + h + MINIMUM_DENOMINATOR) h_over_r_plus_h_squared = h_over_r_plus_h / (r + h + MINIMUM_DENOMINATOR) prev_prob_factor = prev_prob * prev_factor next_prob_factor = next_prob * next_factor prob_factor_diff = prev_prob_factor - next_prob_factor term1 = prob_factor_diff * (h_over_r_plus_h * 0.5 + h_over_r_plus_h_squared) term2 = h_over_r_plus_h * (prev_prob_factor * day_counter.days_between(accrual_start_date, partition_start)[0] - \ next_prob_factor * day_counter.days_between(accrual_start_date, partition_end)[0]) integral_value = term1 + term2 sum_accrual += R * integral_value prev_factor = next_factor prev_prob = next_prob return sum_accrual / day_counter.days_between(accrual_start_date, accrual_end_date)[0]