Source code for finpricing.utils.interpolator
from enum import Enum
from math import log, exp
from bisect import bisect_left
from .error import NotSupportedError
[docs]
class InterpTypes(Enum):
FLAT_FWD_RATES = 1
[docs]
class CurveInterpolator:
def __init__(self):
pass
[docs]
class Interpolator(CurveInterpolator):
def __init__(self,
times,
dfs,
method: InterpTypes = InterpTypes.FLAT_FWD_RATES) -> None:
"""Interpolate discount factors
Args:
times (list): list of times (fraction of year); monotonically increasing
dfs (list): list of discount factors; same length as times
method (InterpTypes): interpolation method
"""
self._times = times
self._dfs = dfs
self._method = method
self.validate()
[docs]
def __repr__(self) -> str:
return f"Interpolator(method={self._method})"
@property
def times(self):
return self._times
@times.setter
def times(self, times):
self._times = times
@property
def dfs(self):
return self._dfs
@dfs.setter
def dfs(self, dfs):
self._dfs = dfs
[docs]
def validate(self) -> None:
"""Validate input data"""
if len(self._times) != len(self._dfs):
raise ValueError("Length of times and dfs must be the same")
if not all([t1 < t2 for t1, t2 in zip(self._times[:-1], self._times[1:])]):
raise ValueError("Times must be monotonically increasing")
if self._times[0] != 0:
raise ValueError("First time must be 0")
if self._dfs[0] != 1:
raise ValueError("First discount factor must be 1")
if self._dfs[-1] <= 0:
raise ValueError("Last discount factor must be positive")
[docs]
def eval(self, t: float) -> float:
"""Evaluate discount factor at time t
Args:
t (float): time (fraction of year)
Returns:
float: interpolated discount factor
"""
if t == 0:
return self._dfs[0]
if self._method == InterpTypes.FLAT_FWD_RATES:
idx = bisect_left(self._times, t)
if idx == 1:
rt = -log(self._dfs[1]) * t / self._times[1]
# extrapolate the rate from the last two points
elif idx == len(self._times):
r_left = -log(self._dfs[-2])
r_right = -log(self._dfs[-1])
dt = self._times[-1] - self._times[-2]
rt = ((self._times[-1] - t) * r_left + (t - self._times[-2]) * r_right) / dt
else:
r_left = -log(self._dfs[idx - 1])
r_right = -log(self._dfs[idx])
dt = self._times[idx] - self._times[idx - 1]
rt = ((self._times[idx] - t) * r_left + (t - self._times[idx - 1]) * r_right) / dt
return exp(-rt)
else:
raise NotSupportedError("Interpolation method is not supported")