Source code for tsdate.demography

# MIT License
#
# Copyright (c) 2020 University of Oxford
# Copyright (c) 2021-2023 Tskit Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Routines and classes for manipulating demographic histories in tsdate
"""

import numpy as np
import scipy.stats


[docs] class PopulationSizeHistory: """ Stores a piecewise constant population size history and tranforms time from a natural (generational) scale to a coalescent one. """ @staticmethod def _change_time_measure(time_ago, breakpoints, time_measure): """ Rescales time given a piecewise-constant time measure. To convert from generations to coalescent units, the time measure per epoch should be 2 * effective population size. To convert from coalescent units to generations, the time measure should be the coalescent rate ``1/(2 * Ne)``. :param np.ndarray time_ago: An increasing vector of time points :param np.ndarray breakpoints: Start times of pieces :param np.ndarray time_measure: Time measure within pieces :return: Inputs in new time measure """ assert np.all(np.diff(breakpoints) > 0.0) assert np.min(breakpoints) == 0.0 assert np.all(time_ago >= 0.0) assert np.all(time_measure > 0.0) assert breakpoints.size == time_measure.size index = np.searchsorted(breakpoints, time_ago, side="right") - 1 step = np.concatenate( [ [0.0], np.cumsum( breakpoints[1:] * (1.0 / time_measure[:-1] - 1.0 / time_measure[1:]) ), ] ) new_time_ago = time_ago * 1.0 / time_measure[index] + step[index] new_breakpoints = breakpoints * 1.0 / time_measure + step new_time_measure = 1.0 / time_measure return new_time_ago, new_breakpoints, new_time_measure def __init__(self, population_size, time_breaks=None): """ :param array_like population_size: An array containing diploid population sizes per epoch :param array_like time_breaks: A sorted array containing time breaks that divide epochs, measured in units of generations in the past """ if time_breaks is None: time_breaks = [] if isinstance(population_size, (int, float)): population_size = np.array([population_size], dtype=float) else: try: population_size = np.array(population_size, dtype=float) except (ValueError, TypeError) as e: raise e.__class__( "Population sizes must be convertable to a numpy float array" ) from e if not np.all(population_size > 0.0): raise ValueError("Population sizes must be greater than 0") if not np.all(np.isfinite(population_size)): raise ValueError("Population sizes must be finite") try: time_breaks = np.array(time_breaks, dtype=float) except (ValueError, TypeError) as e: raise e.__class__( "Time breaks must be convertable to a numpy float array" ) from e if not time_breaks.size == population_size.size - 1: raise ValueError( "The length of the population size array must be one less " "than the number of epoch time breaks" ) if time_breaks.size > 0: if not np.all(time_breaks > 0.0): raise ValueError("Epoch time breaks must be greater than 0") if not np.all(np.diff(time_breaks) > 0.0): raise ValueError( "Epoch time breaks must be unique and in increasing order" ) self.time_breaks = np.append([0.0], time_breaks.flatten()) self.population_size = 2 * population_size.flatten() _, coalescent_breaks, coalescent_rate = self._change_time_measure( self.time_breaks, self.time_breaks, self.population_size ) self.coalescent_breaks = coalescent_breaks self.coalescent_rate = coalescent_rate def as_dict(self): """ Return the population size history as a dictionary of parameters that can be used to initialise a new object """ ret_val = {"population_size": list(self.population_size / 2)} assert self.time_breaks[0] == 0.0 if len(self.time_breaks) > 1: ret_val["time_breaks"] = list(self.time_breaks[1:]) return ret_val def to_natural_timescale(self, coalescent_time_ago): """ Convert a vector of times from coalescent units to generations :param np.ndarray coalescent_time_ago: Times in the past, in coalescent units :return: Times in the past, in generations """ if not isinstance(coalescent_time_ago, np.ndarray): raise ValueError("Times must be in a numpy array") time_ago, _, _ = self._change_time_measure( coalescent_time_ago, self.coalescent_breaks, self.coalescent_rate, ) return time_ago def to_coalescent_timescale(self, time_ago): """ Convert a vector of times from generations to coalescent units :param np.ndarray time_ago: Times in the past, in generations :return: Times in the past, in coalescent units """ if not isinstance(time_ago, np.ndarray): raise ValueError("Times must be in a numpy array") coalescent_time_ago, _, _ = self._change_time_measure( time_ago, self.time_breaks, self.population_size, ) return coalescent_time_ago def gamma_to_natural(self, shape=1, rate=1): """ Given a gamma distribution on a coalescent timescale with parameters `shape` and `rate`, return natural parameters of a gamma approximation to the distribution under a change of measure to a generational timescale. :param float shape: Shape parameter of gamma :param float rate: Rate parameter of gamma (inverse of scale) :return: natural parameters after change of measure """ assert shape > 0, "Gamma shape parameter must be positive" assert rate > 0, "Gamma rate parameter must be positive" C = np.exp(shape * np.log(rate) - scipy.special.loggamma(shape)) gamma_cdf = scipy.special.gammainc cdf_breaks = np.append(self.coalescent_breaks, [np.inf]) cdf_0 = ( C * scipy.special.gamma(shape) / rate**shape * np.diff(gamma_cdf(shape + 0, rate * cdf_breaks)) ) mn_coef_0 = self.time_breaks - self.population_size * self.coalescent_breaks va_coef_0 = mn_coef_0**2 cdf_1 = ( C * scipy.special.gamma(shape + 1) / rate ** (shape + 1) * np.diff(gamma_cdf(shape + 1, rate * cdf_breaks)) ) mn_coef_1 = self.population_size va_coef_1 = mn_coef_0 * mn_coef_1 * 2 cdf_2 = ( C * scipy.special.gamma(shape + 2) / rate ** (shape + 2) * np.diff(gamma_cdf(shape + 2, rate * cdf_breaks)) ) va_coef_2 = mn_coef_1**2 mn = np.sum(mn_coef_1 * cdf_1 + mn_coef_0 * cdf_0) va = np.sum(va_coef_2 * cdf_2 + va_coef_1 * cdf_1 + va_coef_0 * cdf_0) va -= mn**2 new_shape = mn**2 / va new_rate = mn / va return np.array([new_shape, new_rate])
# TODO: # @staticmethod # def from_demes(filename): # """ # Create a `PopulationSizeHistory` instance from a `demes` format YAML # """ # TODO: # output a PopulationSizeHistory as a json object, so we can store it # in the provenance. See https://github.com/tskit-dev/tsdate/issues/274