Source code for tsdate.base

# MIT License
#
# Copyright (c) 2021-23 Tskit Developers
# Copyright (c) 2020-21 University of Oxford
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Base classes and internal constants used by tsdate
"""

import numpy as np

FLOAT_DTYPE = np.float64
LIN = "linear"
LOG = "logarithmic"
GAMMA_PAR = "gamma_parameter"

#: The default value for `approx_prior_size` (see :func:`~tsdate.build_prior_grid` and
#: :func:`~tsdate.build_parameter_grid`)
DEFAULT_APPROX_PRIOR_SIZE = 10000


[docs] class NodeGridValues: """ A class to store times or discretised distributions of times for node ids. For nodes with fixed times, only a single time value needs to be stored. For non-fixed nodes, an array of len(timepoints) probabilies is required. .. note:: This class is not intended to be used directly by users and may be subject to change of name or internal structure in future versions. For details on how to create a ``NodeGridValues`` object to be used as a prior, see :ref:`sec_priors`. :ivar num_nodes: The number of nodes that will be stored in this object :vartype num_nodes: int :ivar nonfixed_nodes: a (possibly empty) numpy array of unique positive node ids each of which must be less than num_nodes. Each will have an array of grid_size associated with it. All others (up to num_nodes) will be associated with a single scalar value instead. :vartype nonfixed_nodes: numpy.ndarray :ivar timepoints: Array of time points :vartype timepoints: numpy.ndarray :ivar fill_value: What should we fill the data arrays with to start with :vartype fill_value: float """ def __init__( self, num_nodes, nonfixed_nodes, timepoints, fill_value=np.nan, dtype=FLOAT_DTYPE, ): """ :param numpy.ndarray grid: The input numpy.ndarray. """ if nonfixed_nodes.ndim != 1: raise ValueError("nonfixed_nodes must be a 1D numpy array") if np.any((nonfixed_nodes < 0) | (nonfixed_nodes >= num_nodes)): raise ValueError( "All non fixed node ids must be between zero and the total node number" ) grid_size = len(timepoints) if type(timepoints) is np.ndarray else timepoints self.timepoints = timepoints # Make timepoints immutable so no risk of overwritting them with copy self.timepoints.setflags(write=False) self.num_nodes = num_nodes self.nonfixed_nodes = nonfixed_nodes self.num_nonfixed = len(nonfixed_nodes) self.grid_data = np.full((self.num_nonfixed, grid_size), fill_value, dtype=dtype) self.fixed_data = np.full(num_nodes - self.num_nonfixed, fill_value, dtype=dtype) self.row_lookup = np.empty(num_nodes, dtype=np.int64) # non-fixed nodes get a positive value, indicating lookup in the grid_data array self.row_lookup[nonfixed_nodes] = np.arange(self.num_nonfixed) # fixed nodes get a negative value from -1, indicating lookup in the scalar array self.row_lookup[np.logical_not(np.isin(np.arange(num_nodes), nonfixed_nodes))] = ( -np.arange(num_nodes - self.num_nonfixed) - 1 ) self.probability_space = LIN def force_probability_space(self, probability_space): """ probability_space can be "logarithmic" or "linear": this function will force the current probability space to the desired type """ descr = ( self.probability_space, "probabilities into", probability_space, "space", ) if probability_space == LIN: if self.probability_space == LIN: pass elif self.probability_space == LOG: self.grid_data = np.exp(self.grid_data) self.fixed_data = np.exp(self.fixed_data) self.probability_space = LIN else: raise TypeError("Cannot force " + " ".join(descr)) elif probability_space == LOG: if self.probability_space == LOG: pass elif self.probability_space == LIN: with np.errstate(divide="ignore", invalid="ignore"): self.grid_data = np.log(self.grid_data) self.fixed_data = np.log(self.fixed_data) self.probability_space = LOG else: raise TypeError("Cannot force " + " ".join(descr)) elif probability_space == GAMMA_PAR: if self.probability_space == GAMMA_PAR: pass else: raise TypeError("Cannot force " + " ".join(descr)) else: raise ValueError(f"Bad probability space {probability_space}") def standardize(self): """ Standardize grid data so the max for each row is one (in linear space) or zero (in logarithmic space) TODO - is it clear why we omit the first element of the """ rowmax = self.grid_data[:, 1:].max(axis=1) if self.probability_space == LIN: self.grid_data = self.grid_data / rowmax[:, np.newaxis] elif self.probability_space == LOG: self.grid_data = self.grid_data - rowmax[:, np.newaxis] else: raise RuntimeError("Probability space is not", LIN, "or", LOG) def to_probabilities(self): """ Change grid data into probabilities (i.e. each row sums to one in linear or zero in logarithmic space) """ if self.probability_space != LIN: raise NotImplementedError("Can only convert to probabilities in linear space") assert not np.any(self.grid_data < 0) self.grid_data = self.grid_data / self.grid_data.sum(axis=1)[:, np.newaxis] def __getitem__(self, node_id): index = self.row_lookup[node_id] if index < 0: return self.fixed_data[1 + index] else: return self.grid_data[index, :] def __setitem__(self, node_id, value): index = self.row_lookup[node_id] if index < 0: self.fixed_data[1 + index] = value else: self.grid_data[index, :] = value def clone_with_new_data( self, grid_data=np.nan, fixed_data=None, probability_space=None ): """ Take the row indices etc from an existing NodeGridValues object and make a new similar one but with different data. If grid_data is a single number, fill the entire data array with that, otherwise assume the data is a numpy array of the correct size to fill the gridded data. If grid_data is None, fill with NaN If fixed_data is None and grid_data is a single number, use the same value as grid_data for the fixed data values. If fixed_data is None and grid_data is an array, set the fixed data to np.nan """ def fill_fixed(orig, fixed_data): if type(fixed_data) is np.ndarray: if orig.fixed_data.shape != fixed_data.shape: raise ValueError( "The fixed data array must be the same shape as the original" ) return fixed_data else: return np.full( orig.fixed_data.shape, fixed_data, dtype=orig.fixed_data.dtype ) new_obj = NodeGridValues.__new__(NodeGridValues) new_obj.num_nodes = self.num_nodes new_obj.nonfixed_nodes = self.nonfixed_nodes new_obj.num_nonfixed = self.num_nonfixed new_obj.row_lookup = self.row_lookup new_obj.timepoints = self.timepoints if type(grid_data) is np.ndarray: if self.grid_data.shape != grid_data.shape: raise ValueError( "The grid data array must be the same shape as the original" ) new_obj.grid_data = grid_data new_obj.fixed_data = fill_fixed( self, np.nan if fixed_data is None else fixed_data ) else: if grid_data == 0: # Fast allocation new_obj.grid_data = np.zeros( self.grid_data.shape, dtype=self.grid_data.dtype ) else: new_obj.grid_data = np.full( self.grid_data.shape, grid_data, dtype=self.grid_data.dtype ) new_obj.fixed_data = fill_fixed( self, grid_data if fixed_data is None else fixed_data ) if probability_space is None: new_obj.probability_space = self.probability_space else: new_obj.probability_space = probability_space return new_obj def nonfixed_dict(self): """ Return a dictionary mapping integer node ids to their data. """ return {n: self[n] for n in self.nonfixed_nodes}