Source code for msprime.demography

#
# Copyright (C) 2015-2021 University of Oxford
#
# This file is part of msprime.
#
# msprime is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# msprime is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with msprime.  If not, see <http://www.gnu.org/licenses/>.
#
"""
Module responsible for defining and debugging demographic models.
"""
from __future__ import annotations

import collections
import copy
import dataclasses
import enum
import inspect
import itertools
import json
import logging
import math
import numbers
import operator
import re
import sys
import textwrap
import warnings
from typing import Any
from typing import ClassVar
from typing import MutableMapping

import demes
import numpy as np
import tskit

from . import ancestry
from . import core
from . import species_trees


logger = logging.getLogger(__name__)


class IncompletePopulationMetadataWarning(UserWarning):
    """
    Warning raised when we don't have sufficient information to fill
    out population metadata.
    """


class LruCache(collections.OrderedDict):
    # LRU example from the OrderedDict documentation
    def __init__(self, maxsize=128, *args, **kwds):
        self.maxsize = maxsize
        super().__init__(*args, **kwds)

    def __getitem__(self, key):
        value = super().__getitem__(key)
        self.move_to_end(key)
        return value

    def __setitem__(self, key, value):
        super().__setitem__(key, value)
        if len(self) > self.maxsize:
            oldest = next(iter(self))
            del self[oldest]


_population_table_cache = LruCache(16)


def _build_population_table(populations):
    """
    Return a tskit PopulationTable instance encoding the metadata for the
    specified populations. Because encoding metadata is quite expensive
    we maintain an LRU cache.
    """
    population_metadata = []
    for population in populations:
        metadata = {
            "name": population.name,
            "description": population.description,
        }
        if population.extra_metadata is not None:
            intersection = set(population.extra_metadata.keys()) & set(metadata.keys())
            if len(intersection) > 0:
                printed_list = list(sorted(intersection))
                raise ValueError(
                    f"Cannot set standard metadata key(s) {printed_list} "
                    "using extra_metadata. Please set using the corresponding "
                    "property of the Population class."
                )
            metadata.update(population.extra_metadata)
        population_metadata.append(metadata)

    # The only thing we store in the Population table is the metadata, so
    # we cache based on this.
    key = json.dumps(population_metadata, sort_keys=True)
    if key not in _population_table_cache:
        table = tskit.PopulationTable()
        table.metadata_schema = tskit.MetadataSchema(
            {
                "codec": "json",
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "description": {"type": ["string", "null"]},
                },
                # The name and description fields are always filled out by
                # msprime, so we tell downstream tools this by making them
                # "required" by the schema.
                "required": ["name", "description"],
                "additionalProperties": True,
            }
        )
        for metadata in population_metadata:
            table.add_row(metadata=metadata)
        _population_table_cache[key] = table

    return _population_table_cache[key]


def check_num_populations(num_populations):
    """
    Check if an input number of populations is valid.
    """
    if num_populations < 1:
        raise ValueError("Must have at least one population")


def check_migration_rate(migration_rate):
    """
    Check if an input migration rate makes sense.
    """
    if migration_rate < 0:
        raise ValueError("Migration rates must be non-negative")


[docs]@dataclasses.dataclass class Population: """ A single population in a :class:`.Demography`. See the :ref:`sec_demography_populations` section for more information on what populations represent, and how they can be used. .. warning:: This class should not be instantiated directly. Please use :meth:`.Demography.add_population` method instead. """ initial_size: float = 0.0 """ The absolute size of the population at time zero. See the :ref:`sec_demography_populations_initial_size` section for more details and examples. """ growth_rate: float = 0.0 """ The exponential growth rate of the population per generation (forwards in time). Growth rates can be negative. This is zero for a constant population size, and positive for a population that has been growing. See the :ref:`sec_demography_populations_growth_rate` section for more details and examples. """ name: str | None = None """ The name of the population. If specified this must be a uniquely identifying string and must be a valid Python identifier (i.e., could be used as a variable name in Python code). See :ref:`sec_demography_populations_identifiers` for more details and recommendations on best practise. """ description: str = "" """ A concise description of the population. Defaults to the empty string if not specified. """ extra_metadata: dict = dataclasses.field(default_factory=dict) """ A JSON-encodable dictionary of metadata items to be stored in the associated tskit population object. This dictionary must not contain keys for any of the pre-defined metadata items. See the :ref:`sec_demography_populations_metadata` section for more details and examples. """ default_sampling_time: float | None = None """ The default time at which samples are drawn from this population. See the :ref:`sec_demography_populations_default_sampling_time` section for more details. """ initially_active: bool | None = None """ If True, this population will always be initially active, regardless of whether it participates in a :ref:`sec_demography_events_population_split`. If not set, or None, the initial state of the population will be set automatically depending on the events declared in the demography. See the :ref:`sec_demography_populations_life_cycle` section for more details. """ id: int | None = dataclasses.field(default=None) # noqa: A003 """ The integer ID of this population within the parent :class:`.Demography`. This attribute is assigned by the Demography class and should not be set or changed by user code. """ def asdict(self): return dataclasses.asdict(self) def validate(self): if self.initial_size < 0: raise ValueError("Negative population size") if self.name is None: raise ValueError("A population name must be set.") if not self.name.isidentifier(): raise ValueError("A population name must be a valid Python identifier")
[docs]@dataclasses.dataclass class Demography(collections.abc.Mapping): """ The definition of a demographic model for an msprime simulation, consisting of a set of populations, a migration matrix, and a list of demographic events. See the :ref:`sec_demography` section for detailed documentation on how to define, debug and simulate with demography in msprime. Please see the :ref:`sec_demography_demography_objects` section for details of how to access and update population information within a model. Demography objects implement the Python :class:`python:collections.abc.Mapping` protocol, in which the keys are **either** the population ``name`` or integer ``id`` values (see the :ref:`sec_demography_populations_identifiers` section for more information) and the values are :class:`.Population` objects. In general, population references in methods such as :meth:`.Demography.add_population_split` can either be string names or integer IDs, and the two forms can be used interchangeably. """ populations: list[Population] = dataclasses.field(default_factory=list) events: list = dataclasses.field(default_factory=list) # Until we can use numpy type hints properly, it's not worth adding them # here. We still have to add in ignores below for indexed assignment errors. migration_matrix: Any | None = None def __post_init__(self): if self.migration_matrix is None: N = self.num_populations self.migration_matrix = np.zeros((N, N)) # People might get cryptic errors from passing in copies of the same # population, so check for it. if len({id(pop) for pop in self.populations}) != len(self.populations): raise ValueError("Population objects must be distinct") # Assign the IDs and default names, if needed. for j, population in enumerate(self.populations): if population.id is not None: raise ValueError( "Population ID should not be set before using to create " "a Demography" ) population.id = j if population.name is None: population.name = f"pop_{j}" self._validate_populations()
[docs] def add_population( self, *, initial_size: float, growth_rate: float | None = None, name: str | None = None, description: str | None = None, extra_metadata: dict | None = None, default_sampling_time: float | None = None, initially_active: bool | None = None, ) -> Population: """ Adds a new :class:`.Population` to this :class:`.Demography` with the specified parameters. The new population will have ID equal to the the number of populations immediately before ``add_population`` is called, such that the first population added has ID 0, the next ID 1 and so on. If the ``name`` is not specified, this defaults to ``"pop_{id}"``. An :ref:`sec_demography_populations_initial_size` value must be specified (but may be zero). :param float initial_size: The number of individuals of the population at time zero. See the :ref:`sec_demography_populations_initial_size` section for more details and examples. :param float growth_rate: The exponential growth rate of the population. See the :ref:`sec_demography_populations_growth_rate` section for more details and examples. :param str name: The human-readable identifier for this population. If not specified, defaults to the string ``"pop_{id}"`` where ``id`` is the population's integer ID. See :ref:`sec_demography_populations_identifiers` for more details and recommendations on best practise. :param str description: A concise but informative description of what this population represents within the wider model. Defaults to the empty strings. :param dict extra_metadata: Extra metadata to associate with this population that will be stored tree sequences output by :func:`.sim_ancestry`. See the :ref:`sec_demography_populations_metadata` section for more details and examples. :param float default_sampling_time: The time at which samples will be taken from this population, if a time in not otherwise specified. By default this is determined by the details of the model, and whether populations are ancestral in :ref:`sec_demography_events_population_split` events. See the :ref:`sec_demography_populations_default_sampling_time` section for more details. :param bool initially_active: Whether this population is initially :ref:`active<sec_demography_populations_life_cycle>`. By default this is determined by the details of the model, and whether populations are ancestral in :ref:`sec_demography_events_population_split` events. See the :ref:`sec_demography_populations_life_cycle` section for more details. :returns: The new :class:`.Population` instance. :rtype: Population """ N = self.num_populations population = Population( id=N, initial_size=initial_size, growth_rate=0 if growth_rate is None else growth_rate, name=f"pop_{N}" if name is None else name, description="" if description is None else description, extra_metadata={} if extra_metadata is None else extra_metadata, default_sampling_time=default_sampling_time, initially_active=initially_active, ) self.populations.append(population) # TODO this is inefficient - we should probably store the migration # matrix in a sparse dictionary form internally. M = self.migration_matrix self.migration_matrix = np.zeros((N + 1, N + 1)) self.migration_matrix[:N, :N] = M self._validate_populations() return population
def _add_population_from_old_style( self, pop_config: PopulationConfiguration, name: str | None = None ) -> Population: population = self.add_population( name=name, initial_size=pop_config.initial_size, growth_rate=pop_config.growth_rate, ) metadata = pop_config.metadata if metadata is not None and isinstance(metadata, collections.abc.Mapping): metadata = metadata.copy() if "name" in metadata: population.name = metadata.pop("name") if name is not None and name != population.name: # Maybe this should be a warning, or just ignored entirely? raise ValueError( "Population name already set in old-style metadata " f"({name}) and doesn't match supplied name " f"({population.name})" ) if "description" in metadata: population.description = metadata.pop("description") population.extra_metadata = metadata return population def add_event(self, event: DemographicEvent) -> DemographicEvent: if not isinstance(event, DemographicEvent): raise TypeError("Events must be instances of DemographicEvent") event.demography = self self.events.append(event) return event
[docs] def set_migration_rate( self, source: str | int, dest: str | int, rate: float ) -> None: """ Sets the backwards-time rate of migration from the specified ``source`` population to ``dest`` to the specified value. This has the effect of setting ``demography.migration_matrix[source, dest] = rate``. It is the rate at which a lineage currently in ``source`` moves to ``dest`` as one follows the lineage back through time. .. important:: Note this is the :ref:`backwards in time<sec_demography_direction_of_time>`; migration rate and that ``source`` and ``dest`` are from the perspective of lineages in the coalescent process. See :ref:`sec_demography_migration` for more details and clarification on this vital point. The ``source`` and ``dest`` populations can be referred to either by their integer ``id`` or string ``name`` values. :param str,int source: The source population from which lineages originate in the backwards-time process. :param str,int dest: The destination population where lineages are move to in the backwards-time process. :param float rate: The per-generation migration rate. """ source = self[source].id dest = self[dest].id if source == dest: raise ValueError("The source and dest populations must be different") self.migration_matrix[source, dest] = rate # type: ignore
[docs] def set_symmetric_migration_rate( self, populations: list[str | int], rate: float, ) -> None: """ Sets the symmetric migration rate between all pairs of populations in the specified list to the specified value. For a given pair of population IDs ``j`` and ``k``, this sets ``demography.migration_matrix[j, k] = rate`` and ``demography.migration_matrix[k, j] = rate``. Populations may be specified either by their integer IDs or by their string names. :param list populations: An iterable of population identifiers (integer IDs or string names). :param float rate: The value to set the migration matrix entries to. """ # There's an argument for not checking this so that corner cases on # single population models can be handled. However, it's nearly always # going to be a user error where someone forgets the second population # so it seems better to raise an error to prevent hard-to-detect mistakes. if len(populations) < 2: raise ValueError("Must specify at least two populations") pop_ids = [self[identifier].id for identifier in populations] for pop_j, pop_k in itertools.combinations(pop_ids, 2): self.migration_matrix[pop_j, pop_k] = rate # type: ignore self.migration_matrix[pop_k, pop_j] = rate # type: ignore
# Demographic events. def _check_population_references(self, populations: list[str | int]): for pop_ref in populations: # Slightly unsure whether this call can get optimised out, but # there doesn't seem to be a better way to do it without duplicating # the KeyError message self[pop_ref] def _add_activate_population_event( self, time: float, *, population: str | int ) -> ActivatePopulationEvent: """ Activates a population at the specified time. The population is expected to be initially inactive. :param float time: The time at which this event occurs in generations. :param str, int population: The population to activate. """ self._check_population_references([population]) return self.add_event(ActivatePopulationEvent(time=time, population=population))
[docs] def add_population_split( self, time: float, *, derived: list[str | int], ancestral: str | int ) -> PopulationSplit: """ Adds a population split event at the specified time. In a population split event all lineages from the (more recent) derived populations move to the (more ancient) ancestral population. Forwards in time, this corresponds to the ancestral population splitting into the derived populations. See the :ref:`sec_demography_events_population_split` section for more details and examples. In addition to moving lineages from the derived population(s) into the ancestral population, a population split has the following additional effects: - All derived populations are set to :ref:`inactive<sec_demography_populations_life_cycle>`. - All migration rates to and from the derived populations are set to 0. - Population sizes and growth rates for the derived populations are set to 0. - The ``default_sampling_time`` of the ``ancestral`` :class:`.Population` is set to the time of this event, **if** the ``default_sampling_time`` for the ancestral population has not already been set. :param float time: The time at which this event occurs in generations. :param list(str, int) derived: The derived populations. :param str, int ancestral: The ancestral population. """ self._check_population_references(list(derived) + [ancestral]) pop = self[ancestral] if pop.initially_active is None: pop.initially_active = False if pop.default_sampling_time is None: pop.default_sampling_time = time return self.add_event( PopulationSplit(time=time, derived=derived, ancestral=ancestral) )
[docs] def add_admixture( self, time: float, *, derived: str | int, ancestral: list[str | int], proportions: list[float], ) -> Admixture: """ Adds an admixture event at the specified time. In an admixture event all lineages from a (more recent) ``derived`` population move to a list of (more ancient) ``ancestral`` populations according to a list of ``proportions``, such that a given lineage has a probability ``proportions[j]`` of being moved to the population ``ancestral[j]``. This movement of lineages backwards in time corresponds to the initial state of the admixed derived population the specified ``time`` being composed of individuals from the specified ``ancestral`` populations in the specified ``proportions``. See the :ref:`sec_demography_events_admixture` section for more details and examples. In addition to moving lineages from the derived population into the ancestral population(s), an admixture has the following additional effects: - The derived population is set to :ref:`inactive<sec_demography_populations_life_cycle>`. - The ancestral populations are set to :ref:`active<sec_demography_populations_life_cycle>`, if they are not already active. - All migration rates to and from the derived population are set to 0. - Population sizes and growth rates for the derived population are set to 0, and the population is marked as inactive. :param float time: The time at which this event occurs in generations. :param str, int derived: The derived population. :param list(str, int) ancestral: The ancestral populations. :param list(float) proportions: The proportion of the derived population from each of the ancestral populations at the time of the event. """ self._check_population_references(list(ancestral) + [derived]) # Useful feature here might be to support taking n - 1 proportion values # and computing 1 - sum for the last value. Could be tedious for users to # do this manually. if not math.isclose(sum(proportions), 1.0): raise ValueError("Sum of the admixture proportions must be approximately 1") return self.add_event( Admixture( time=time, derived=derived, ancestral=ancestral, proportions=proportions ) )
[docs] def add_mass_migration( self, time: float, *, source: str | int, dest: str | int, proportion: float, ) -> MassMigration: """ Adds a mass migration (or "pulse migration") event at the specified time. In a mass migration event, lineages in the ``source`` population are moved to the ``dest`` population with probability ``proportion``. Forwards-in-time, this corresponds to individuals migrating **from** population ``dest`` **to** population ``source``. Please see the :ref:`sec_demography_events_mass_migration` section for more details and examples. .. warning:: Mass migrations are an advanced feature and should only be used if the required population dynamics cannot be modelled by :ref:`sec_demography_events_population_split` or :ref:`sec_demography_events_admixture` events. .. important:: Note that ``source`` and ``dest`` are from the perspective of the coalescent process, i.e. :ref:`backwards in time<sec_demography_direction_of_time>`; please see the :ref:`sec_demography_migration` section for more details. :param float time: The time at which this event occurs in generations. :param str, int source: The population **from** which lineages are moved. :param str, int dest: The population **to** which lineages are moved. :param float proportion: For each lineage in the ``source`` population, this is the probability that it moves to the ``dest`` population. """ self._check_population_references([source, dest]) return self.add_event(MassMigration(time, source, dest, proportion))
[docs] def add_migration_rate_change( self, time: float, *, rate: float, source: int | str | None = None, dest: int | str | None = None, ) -> MigrationRateChange: """ Changes the rate of migration from one deme to another to a new value at a specific time. Migration rates are specified in terms of the rate at which lineages move from population ``source`` to ``dest`` during the progress of the simulation. .. important:: Note that ``source`` and ``dest`` are from the perspective of the coalescent process, i.e. :ref:`backwards in time<sec_demography_direction_of_time>`; please see the :ref:`sec_demography_migration` section for more details. By default, ``source=None`` and ``dest=None``, which results in all non-diagonal elements of the migration matrix being changed to the new rate. If ``source`` and ``dest`` are specified, they must refer to valid populations (either integer IDs or string names). :param float time: The time at which this event occurs in generations. :param float rate: The new per-generation migration rate. :param str, int source: The ID of the source population. :param str, int dest: The ID of the destination population. :param int source: The source population ID. """ if source is None: source = -1 else: self._check_population_references([source]) if dest is None: dest = -1 else: self._check_population_references([dest]) return self.add_event( MigrationRateChange(time=time, source=source, dest=dest, rate=rate) )
[docs] def add_symmetric_migration_rate_change( self, time: float, populations: list[str | int], rate: float ) -> SymmetricMigrationRateChange: """ Sets the symmetric migration rate between all pairs of populations in the specified list to the specified value. For a given pair of population IDs ``j`` and ``k``, this sets ``migration_matrix[j, k] = rate`` and ``migration_matrix[k, j] = rate``. Please see the :ref:`sec_demography_migration` section for more details. Populations may be specified either by their integer IDs or by their string names. :param float time: The time at which this event occurs in generations. :param list populations: An sequence of population identifiers (integer IDs or string names). :param float rate: The new migration rate. """ self._check_population_references(populations) return self.add_event( SymmetricMigrationRateChange(time=time, populations=populations, rate=rate) )
[docs] def add_population_parameters_change( self, time: float, *, initial_size: float | None = None, growth_rate: float | None = None, population: int | None = None, ) -> PopulationParametersChange: """ Changes the size parameters of a population (or all populations) at a given time. Please see the :ref:`sec_demography_populations` section for more details. :param float time: The length of time ago at which this event occurred. :param float initial_size: The number of individuals in the population at the beginning of the time slice starting at ``time``. If None, the initial_size of the population is computed according to the initial population size and growth rate over the preceding time slice. :param float growth_rate: The new per-generation growth rate. If None, the growth rate is not changed. Defaults to None. :param str, int population: The ID of the population affected. If ``population`` is None, the changes affect all populations simultaneously. """ if population is not None: self._check_population_references([population]) event = PopulationParametersChange( time, initial_size=initial_size, growth_rate=growth_rate, population=population, ) return self.add_event(event)
[docs] def add_simple_bottleneck( self, time: float, population: int | str, proportion: float | None = None, ) -> SimpleBottleneck: """ Adds a population bottleneck at the specified time in which each lineage has probability equal to ``proportion`` of coalescing into a single ancestor. Please see the :ref:`sec_demography_events_simple_bottleneck` section for more details. :param float time: The length of time ago at which this event occurred. :param str, int population: The ID of the population affected. :param float proportion: The probability of each lineage coalescing into a single ancestor. Defaults to 1.0. """ proportion = 1.0 if proportion is None else proportion self._check_population_references([population]) return self.add_event( SimpleBottleneck(time=time, population=population, proportion=proportion) )
[docs] def add_instantaneous_bottleneck( self, time: float, *, population: str | int, strength: float ) -> InstantaneousBottleneck: """ Adds a bottleneck at the specified time in the specified population that is equivalent to the coalescent process running for ``strength`` generations. Please see the :ref:`sec_demography_events_instantaneous_bottleneck` section for more details. .. note:: The :ref:`ploidy<sec_ancestry_ploidy_coalescent_time_scales>` is also use to scale the time scale of the coalescent process during the bottleneck. :param float time: The length of time ago at which this event occurred. :param str, int population: The ID of the population affected. :param float strength: The equivalent amount of time in the standard coalescent. """ self._check_population_references([population]) return self.add_event( InstantaneousBottleneck(time=time, population=population, strength=strength) )
[docs] def add_census(self, time: float) -> CensusEvent: """ Adds a "census" event at the specified time. In a census we add a node to each branch of every tree, thus recording the population that each lineage is in at the specified time. This may be used to record all ancestral haplotypes present at that time, and to extract other information related to these haplotypes: for instance to trace the local ancestry of a sample back to a set of contemporaneous ancestors, or to assess whether a subset of samples has coalesced more recently than the census time. (However, these added nodes will only represent the portions of ancestral genomes inherited by the samples, rather than complete ancestral genomes.) See :ref:`sec_ancestry_census_events` for more details. .. warning:: When used in the conjunction with the DTWF model non-integer census times should be used to guarantee that the census nodes don't coincide with coalescences (and therefore zero branch length errors). See :ref:`sec_ancestry_census_events_dtwf` for more details. :param float time: The time at which the census should occur. """ return self.add_event(CensusEvent(time))
def _populations_table(self): cols = [ ("id", ""), ("name", ""), ("description", ""), ("initial_size", ".1f"), ("growth_rate", ".2g"), ("default_sampling_time", ".2g"), ("extra_metadata", ""), ] data = [ [f"{getattr(pop, attr):{fmt}}" for attr, fmt in cols] for pop in self.populations ] return [title for title, _ in cols], data def _populations_text(self): col_titles, data = self._populations_table() alignments = ["^", "<", "<", "<", "^", ">", "<"] data = [ [ [item.as_text() if isinstance(item, core.TableEntry) else item] for item in row ] for row in data ] return core.text_table( "Populations", [[title] for title in col_titles], alignments, data ) def _populations_html(self): col_titles, data = self._populations_table() return core.html_table( f"Populations ({len(self.populations)})", col_titles, data ) def _migration_rate_info(self, source, dest, rate): extra = None if source != dest: source_name = self.populations[source].name dest_name = self.populations[dest].name extra = ( "Backwards in time migration rate from population " f"{source_name} to {dest_name} = {rate} per generation. " "Forwards in time, this is the expected number of migrants " f"moving from {dest_name} to {source_name} " f"per generation, divided by the size of {source_name}." ) return core.TableEntry(f"{rate:.4g}", extra) def _migration_matrix_table(self): col_titles = [""] + [pop.name for pop in self.populations] data = [] for j in range(self.num_populations): row = [self.populations[j].name] + [ self._migration_rate_info(j, k, self.migration_matrix[j, k]) for k in range(self.num_populations) ] data.append(row) return col_titles, data def _migration_matrix_text(self): col_titles, data = self._migration_matrix_table() alignments = ">" + "^" * self.num_populations data = [ [ [item.as_text() if isinstance(item, core.TableEntry) else item] for item in row ] for row in data ] return core.text_table( "Migration Matrix", [[title] for title in col_titles], alignments, data ) def _migration_matrix_html(self): if np.all(self.migration_matrix == 0): return core.html_table("Migration matrix (all zero)", [], []) else: col_titles, data = self._migration_matrix_table() return core.html_table("Migration matrix", col_titles, data) def _events_text(self, events, title="Events"): col_titles = [["time"], ["type"], ["parameters"], ["effect"]] alignments = "><<<" data = [] for event in events: type_text = textwrap.wrap(event._type_str, 15) description = textwrap.wrap(event._parameters(), 22) effect = textwrap.wrap(event._effect(), 38) row = [[f"{event.time:.4g}"], type_text, description, effect] data.append(row) return core.text_table( title, col_titles, alignments, data, internal_hlines=True ) def _events_html(self, events, title=None): if title is None: title = f"Events ({len(events)})" if len(self.events) == 0: return core.html_table(title, [], []) col_titles = ["time", "type", "parameters", "effect"] data = [] for event in events: class_name = event.__class__.__name__ camel_case = re.sub(r"(?<!^)(?=[A-Z])", "_", class_name).lower() add_method = f"msprime.Demography.add_{camel_case}" # TODO change this to stable when 1.0 is released. type_html = ( "<a href='https://tskit.dev/msprime/docs/latest/api.html#" f"{add_method}'>{event._type_str}</a>" ) row = [f"{event.time:.4g}", type_html, event._parameters(), event._effect()] data.append(row) return core.html_table(title, col_titles, data, no_escape=[1]) def _repr_html_(self): resolved = self.validate() return ( '<div style="margin-left:20px">' + resolved._populations_html() + resolved._migration_matrix_html() + resolved._events_html(self.events) + "</div>" ) def __str__(self): resolved = self.validate() populations = resolved._populations_text() migration_matrix = resolved._migration_matrix_text() events = resolved._events_text(self.events) def indent(table): lines = table.splitlines() s = "╟ " + lines[0] + "\n" for line in lines[1:]: s += "║ " + line + "\n" return s s = ( "Demography\n" + indent(populations) + indent(migration_matrix) + indent(events) ) return s def __len__(self): return len(self.populations) def __iter__(self): for pop in self.populations: yield pop.name def __getitem__(self, identifier): """ Returns the population with the specified ID or name. """ if isinstance(identifier, str): for population in self.populations: if population.name == identifier: return population else: raise KeyError(f"Population with name '{identifier}' not found") elif isinstance(identifier, numbers.Integral): # We don't support negative indexing here because -1 is used as # way to refer to *all* populations in demographic events, and # it would be too easy to introduce bugs in old code if we changed # the meaning of this. if identifier < 0 or identifier >= self.num_populations: raise KeyError(f"Population id {identifier} out of bounds") return self.populations[identifier] raise KeyError( "Keys must be either string population names or integer IDs:" f"identifier '{identifier}' is of type {type(identifier)}" ) @property def num_populations(self): return len(self.populations) @property def num_events(self): return len(self.events) def _validate_populations(self): names = set() for j, population in enumerate(self.populations): if population.id != j: raise ValueError( "Incorrect population ID. ID values should not be updated " "by users. Please use Demography.add_population to add extra " "populations after initialisation." ) population.validate() if population.name in names: raise ValueError(f"Duplicate population name: '{population.name}'") names.add(population.name)
[docs] def validate(self): """ Checks the demography looks sensible and raises errors/warnings appropriately, and return a copy in which all default values have been appropriately resolved. """ self._validate_populations() migration_matrix = np.array(self.migration_matrix) N = self.num_populations if migration_matrix.shape != (N, N): raise ValueError( "migration matrix must be a N x N square matrix encoded " "as a list-of-lists or numpy array, where N is the number " "of populations. The diagonal " "elements of this matrix must be zero. For example, a " "valid matrix for a 3 population system is " "[[0, 1, 1], [1, 0, 1], [1, 1, 0]]" ) last_event = None for event in self.events: if not isinstance(event, DemographicEvent): raise TypeError( "Demographic events must be a list of DemographicEvent " "instances sorted in non-decreasing order of time." ) if last_event is not None: if last_event.time > event.time: raise ValueError( "Events must be time-sorted. Please use demography.sort_events()" "if you add events out of order." ) last_event = event resolved = copy.deepcopy(self) for population in resolved.populations: if population.default_sampling_time is None: population.default_sampling_time = 0 if population.initially_active is None: population.initially_active = True return resolved
[docs] def copy(self, populations: list[str] | None = None) -> Demography: """ Returns a copy of this model. If the ``populations`` argument is specified, the populations in the copied model will be in this order. :param list populations: A list of population identifiers defining the order of the populations in the new model. If not specified, the current order is used. :return: A copy of this Demography. """ if populations is None: populations = range(self.num_populations) if len(populations) != self.num_populations: raise ValueError("populations must have Demography.num_populations entries") copy_demog = Demography() for identifier in populations: pop = self[identifier] copy_demog.add_population( initial_size=pop.initial_size, growth_rate=pop.growth_rate, name=pop.name, description=pop.description, extra_metadata=pop.extra_metadata, default_sampling_time=pop.default_sampling_time, initially_active=pop.initially_active, ) for copy_p1, copy_p2 in itertools.combinations(copy_demog.populations, 2): self_p1 = self[copy_p1.name].id self_p2 = self[copy_p2.name].id copy_demog.migration_matrix[copy_p1.id, copy_p2.id] = self.migration_matrix[ self_p1, self_p2 ] copy_demog.migration_matrix[copy_p2.id, copy_p1.id] = self.migration_matrix[ self_p2, self_p1 ] def remap(identifier): if identifier == -1: return -1 return self[identifier].name for event in self.events: copy_event = copy.deepcopy(event) if isinstance(event, (MassMigration, MigrationRateChange)): copy_event.source = remap(event.source) copy_event.dest = remap(event.dest) elif isinstance(event, PopulationSplit): copy_event.derived = [remap(pop) for pop in event.derived] copy_event.ancestral = remap(event.ancestral) elif isinstance(event, Admixture): copy_event.ancestral = [remap(pop) for pop in event.ancestral] copy_event.derived = remap(event.derived) elif isinstance( event, (PopulationParametersChange, SimpleBottleneck, InstantaneousBottleneck), ): copy_event.population = remap(event.population) elif isinstance(event, SymmetricMigrationRateChange): copy_event.populations = [remap(pop) for pop in event.populations] elif isinstance(event, CensusEvent): # Census has no population pass else: raise AssertionError("Event class not implemented in copy") copy_demog.add_event(copy_event) return copy_demog
def sort_events(self): # Sort demographic events by time. Sorting is stable so the relative # order of events at the same time will be preserved. self.events.sort(key=lambda de: de.time) def insert_populations(self, tables): """ Insert population definitions for this demography into the specified set of tables. :meta private: """ assert len(tables.populations) == 0 population_table = _build_population_table(self.populations) tables.populations.metadata_schema = population_table.metadata_schema tables.populations.set_columns( metadata=population_table.metadata, metadata_offset=population_table.metadata_offset, ) def insert_extra_populations(self, tables): """ Insert additional population definitions for this demography into the specified set of tables. We assume that the populations up to len(tables.populations) are identical, and append additional populations to the tables for any remaining. :meta private: """ # TODO we should be accessing a higher-level API for querying # the schema here, but there's none available right now. schema = tables.populations.metadata_schema.schema if schema is not None: properties = schema.get("properties", {}) additional_properties = schema.get("additionalProperties", True) name_in_metadata = "name" in properties or additional_properties description_in_metadata = ( "description" in properties or additional_properties ) if not name_in_metadata: warnings.warn( "The metadata schema does not have a 'name' property; " "population names will not be recorded in the output " "tree sequence", IncompletePopulationMetadataWarning, stacklevel=2, ) if not description_in_metadata: warnings.warn( "The metadata schema does not have a 'description' property; " "population descriptions will not be recorded in the output " "tree sequence", IncompletePopulationMetadataWarning, stacklevel=2, ) left_out = [] for population in self.populations[len(tables.populations) :]: md = {} if name_in_metadata: md["name"] = population.name if description_in_metadata: md["description"] = population.description for k in population.extra_metadata: if k in properties or additional_properties: md[k] = population.extra_metadata[k] else: left_out.append(k) tables.populations.add_row(metadata=md) if len(left_out) > 0: warnings.warn( "The metadata schema does not allow for all properties specified " "in extra_metadata: these keys have not been recorded in the " f"output tree sequence: {', '.join(left_out)}", IncompletePopulationMetadataWarning, stacklevel=2, ) else: warnings.warn( "No metadata schema present in population table, not recording " "metadata", IncompletePopulationMetadataWarning, stacklevel=2, ) # No metadata schema, just add bare populations. for _ in self.populations[len(tables.populations) :]: tables.populations.add_row() def asdict(self): # NOTE: this slightly contradicts the interpretation of the # Demography object as a mapping storing populations. But # this is an internal undocumented method, so seems OK. return { "populations": [pop.asdict() for pop in self.populations], "events": [event.asdict() for event in self.events], "migration_matrix": self.migration_matrix.tolist(), }
[docs] def debug(self): """ Returns a :class:`.DemographyDebugger` instance for this demography. :return: A DemographyDebugger object for this demography. :rtype: msprime.DemographyDebugger """ return DemographyDebugger(demography=self)
def __eq__(self, other): try: self.assert_equal(other) return True except AssertionError: return False
[docs] def assert_equal(self, other: Demography): """ Compares this Demography with specified ``other`` and raises an AssertionError if they are not exactly equal. :param Demography other: The other demography to compare against. """ # TODO we could potentially do better here with error messages # by showing a diff of the str() values for objects that differ. assert isinstance(other, Demography) assert self.num_populations == other.num_populations for p1, p2 in zip(self.populations, other.populations): assert p1 == p2, f"{p1}{p2}" assert np.array_equal( self.migration_matrix, other.migration_matrix ) # type: ignore assert self.num_events == other.num_events for e1, e2 in zip(self.events, other.events): assert e1 == e2, f"{e1}{e2}"
[docs] def is_equivalent(self, other: Demography, rel_tol=None, abs_tol=None): """ Compares this demography with the other and return True if they are equivalent up to the specified numerical tolerances. Two demographies are equivalent if, they have the same set of epochs defined by demographic events, and for each epoch: - The population's ``initial_size``, ``growth_rate`` and ``active`` values are equal in all populations. - The migration matrices are equal - The same sequence of lineage movements through population splits, etc. All numerical comparisons are performed using :func:`python:math.isclose`. :param Demography other: The other demography to compare against. :param float rel_tol: The relative tolerance used by math.isclose. :param float abs_tol: The relative tolerance used by math.isclose. :return: True if this demography and other are equivalent up to numerical tolerances. :rtype bool: bool """ try: self.assert_equivalent(other, rel_tol=rel_tol, abs_tol=abs_tol) return True except AssertionError: return False
def assert_equivalent( self, other: Demography, rel_tol: None | float = None, abs_tol: None | float = None, ): # Same defaults as math.isclose rel_tol = 1e-9 if rel_tol is None else rel_tol abs_tol = 0 if abs_tol is None else abs_tol assert isinstance(other, Demography) self_dbg = self.debug() other_dbg = other.debug() if self.num_populations != other.num_populations: raise AssertionError( "Number of populations not equal: " f"{self.num_populations}{other.num_populations}" ) # Compare the population attributes. # NB use the *resolved* versions from the debug objects for self_pop, other_pop in zip( self_dbg.demography.populations, other_dbg.demography.populations ): if self_pop.name != other_pop.name: raise AssertionError( f"Population names differ: {self_pop.name}{other_pop.name}" ) self_st = ( 0 if self_pop.default_sampling_time is None else self_pop.default_sampling_time ) other_st = ( 0 if other_pop.default_sampling_time is None else other_pop.default_sampling_time ) if not math.isclose(self_st, other_st): raise AssertionError( f"Sampling times not equal for {self_pop.name}: " f"{self_pop.default_sampling_time} ≠ " f"{other_pop.default_sampling_time}" ) if self_dbg.num_epochs != other_dbg.num_epochs: raise AssertionError( "Number of epochs not equal: " f"{self_dbg.num_epochs}{other_dbg.num_epochs}" ) for j, (self_epoch, other_epoch) in enumerate( zip(self_dbg.epochs, other_dbg.epochs) ): if not math.isclose( self_epoch.start_time, other_epoch.start_time, rel_tol=rel_tol, abs_tol=abs_tol, ): raise AssertionError( f"Epoch[{j}] at different times: " f"{self_epoch.start_time}{other_epoch.start_time}" ) for self_pop, other_pop in zip( self_epoch.populations, other_epoch.populations ): if self_pop.state != other_pop.state: raise AssertionError( f"State mismatch in populations in epoch[{j}], {self_pop.name}: " f"{self_pop.state}{other_pop.state}" ) if self_pop.state == PopulationStateMachine.ACTIVE: if not math.isclose( self_pop.start_size, other_pop.start_size, rel_tol=rel_tol, abs_tol=abs_tol, ): raise AssertionError( "Population start_size not equal to required precision " f"in epoch[{j}], {self_pop.name}: " f"{self_pop.start_size}{other_pop.start_size}" ) if not math.isclose( self_pop.growth_rate, other_pop.growth_rate, rel_tol=rel_tol, abs_tol=abs_tol, ): raise AssertionError( "Population growth_rate not equal to required precision " f"in epoch[{j}], {self_pop.name}: " f"{self_pop.growth_rate}{other_pop.growth_rate}" ) m_equal = np.isclose( self_epoch.migration_matrix, other_epoch.migration_matrix, rtol=rel_tol, atol=abs_tol, ) if not np.all(m_equal): differs = "Differences between: " for source_id, dest_id in zip(*np.where(~m_equal)): source = self[source_id].name dest = self[dest_id].name self_rate = self_epoch.migration_matrix[source_id, dest_id] other_rate = other_epoch.migration_matrix[source_id, dest_id] differs += f"({source}, {dest}: {self_rate}{other_rate}), " raise AssertionError( f"Migration matrices in epoch[{j}] not equal: \n" "self = \n" f"{self_epoch.migration_matrix}\n" "other = \n" f"{other_epoch.migration_matrix}\n" f"::{differs[:-2]}" ) self._assert_lineage_movements_equivalent( self_epoch.events, other_epoch.events, rel_tol=rel_tol, abs_tol=abs_tol ) self_state_change = [ event for event in self_epoch.events if isinstance(event, StateChangeEvent) ] other_state_change = [ event for event in other_epoch.events if isinstance(event, StateChangeEvent) ] if len(self_state_change) > 0 or len(other_state_change) > 0: raise ValueError( "State change events not currently supported in equivalent. " "Please open an issue on GitHub" ) def _assert_lineage_movements_equivalent( self, self_events, other_events, *, rel_tol, abs_tol ): self_lineage_movements = self._normalise_lineage_movements(self_events) other_lineage_movements = self._normalise_lineage_movements(other_events) source_pops = set(self_lineage_movements.keys()) if source_pops != set(other_lineage_movements.keys()): raise AssertionError( f"Mismatch in the set of populations affected by lineage movements: " f"{source_pops}{set(other_lineage_movements.keys())}" ) for source in source_pops: self_out_movements = self_lineage_movements[source] other_out_movements = other_lineage_movements[source] if len(self_out_movements) != len(other_out_movements): raise AssertionError( "Mismatch in number of normalised lineage movements out of " f"{source}: {len(self_out_movements)}{len(other_out_movements)}" ) for self_lm, other_lm in zip(self_out_movements, other_out_movements): if self_lm.dest != other_lm.dest: raise AssertionError( "Mismatch in lineage movement destination:" f"{self_lm.dest}{other_lm.dest}" ) if not math.isclose( self_lm.proportion, other_lm.proportion, rel_tol=rel_tol, abs_tol=abs_tol, ): raise AssertionError( "Mismatch in normalised lineage movement proportions " f"from {self_lm.source} to {self_lm.dest}: " f"{self_lm.proportion}{other_lm.proportion}" ) def _normalise_lineage_movements(self, events: list[DemographicEvent]): """ Extract the LineageMovementEvent instances from the specified list and normalise their effects into LineageMovement instances, and return a dictionary mapping source populations to the list of sequential lineage movements. For each source population we have a list of sequential lineage movements, sorted by destination population ID. """ assert len({event.time for event in events}) <= 1 ret = collections.defaultdict(list) for event in events: if isinstance(event, LineageMovementEvent): for move in event._as_lineage_movements(): ret[move.source].append(move) for pop in list(ret.keys()): # We have a list of *conditional* lineage movements out of a # population. We canonicalise this by sorting by the # destination population, so we have to first convert back to # absolute proportions. assert all(lm.source == pop for lm in ret[pop]) P = _sequential_to_proportions([pm.proportion for pm in ret[pop]]) id_value_pairs = sorted((lm.dest, p) for lm, p in zip(ret[pop], P)) S = _proportions_to_sequential([p for _, p in id_value_pairs]) ret[pop] = [ LineageMovement(source=pop, dest=id_value_pairs[j][0], proportion=S[j]) for j in range(len(S)) ] return ret
[docs] @staticmethod def from_species_tree( tree, initial_size, *, time_units="gen", generation_time=None, growth_rate=None, ) -> Demography: """ Parse a species tree in `Newick <https://en.wikipedia.org/wiki/Newick_format>`_ format and return the corresponding :class:`Demography` object. The tree is assumed to be rooted and ultrametric and branch lengths must be included and correspond to time, either in units of millions of years, years, or generations. The returned :class:`.Demography` object contains a :class:`.Population` for each node in the species tree. The population's ``name`` attribute will be either the corresponding node label from the newick tree, if it exists, or otherwise the name takes the form "pop_{j}", where j is the position of the given population in the list. Leaf populations are first in the list, and added in left-to-right order. Populations corresponding to the internal nodes are then added in a postorder traversal of the species tree. For each internal node a :ref:`sec_demography_events_population_split` event is added so that lineages move from its child populations at the appropriate time and rates of continuous migration to and from the child populations is set to zero. See the :ref:`sec_demography_events_population_split` section for more details. The initial sizes and growth rates for the populations in the model are set via the ``initial_size`` and ``growth_rate`` arguments. These can be specified in two ways: if a single number is provided, this is used for all populations. The argument may also be a mapping from population names to their respective values. For example: .. code-block:: python tree = "(A:10.0,B:10.0)C" initial_size = {"A": 1000, "B": 2000, "C": 100} demography = msprime.Demography.from_species_tree(tree, initial_size) Note that it is possible to have default population sizes for unnamed ancestral populations using a :class:`python:collections.defaultdict`, e.g., .. code-block:: python tree = "(A:10.0,B:10.0)" initial_size = collections.defaultdict(lambda: 100) initial_size.update({"A": 1000, "B": 2000}) demography = msprime.Demography.from_species_tree(tree, initial_size) :param str tree: The tree string in Newick format, with named leaves and branch lengths. :param initial_size: Each population's initial_size, in numbers of individuals. May be a single number or a mapping from population names to their sizes. :param growth_rate: Each population's growth_rate. May be a single number or a mapping from population names to their exponential growth rates. Defaults to zero. :param str time_units: The units of time in which the species tree's branch lengths are measured. Allowed branch length units are millions of years, years, and generations; these should be specified with the strings ``"myr"``, ``"yr"``, or ``"gen"``, respectively. This defaults to ``"gen"``. :param float generation_time: The number of years per generation. If and only if the branch lengths are not in units of generations, the generation time must be specified. This defaults to `None`. :return: A Demography object representing the specified species tree. :rtype: msprime.Demography """ return species_trees.parse_species_tree( tree, initial_size=initial_size, growth_rate=growth_rate, time_units=time_units, generation_time=generation_time, )
[docs] @staticmethod def from_starbeast(tree, generation_time, time_units="myr") -> Demography: """ Parse a species tree produced by the program `TreeAnnotator <https://www.beast2.org/treeannotator>`_ based on a posterior tree distribution generated with `StarBEAST <https://academic.oup.com/mbe/article/34/8/2101/3738283>`_ and return the corresponding Demography object. Species trees produced by TreeAnnotator are written in `Nexus <https://en.wikipedia.org/wiki/Nexus_file>`_ format and are rooted, bifurcating, and ultrametric. Branch lengths usually are in units of millions of years, but the use of other units is permitted by StarBEAST (and thus TreeAnnotator). This function allows branch length units of millions of years or years. Leaves must be named and the tree must include information on population sizes of leaf and ancestral species in the form of annotation with the "dmv" tag, which is the case for trees written by TreeAnnotator based on StarBEAST posterior tree distributions. The returned :class:`.Demography` object contains a :class:`.Population` for each node in the species tree. The population's ``name`` attribute will be either the corresponding node label from the newick tree, if it exists, or otherwise the name takes the form "pop_{j}", where j is the position of the given population in the list. Leaf populations are first in the list, and added in left-to-right order. Populations corresponding to the internal nodes are then added in a postorder traversal of the species tree. For each internal node a :ref:`sec_demography_events_population_split` event is added so that lineages move from its child populations at the appropriate time and rates of continuous migration to and from the child populations is set to zero. See the :ref:`sec_demography_events_population_split` section for more details. :param str tree: The tree string in Nexus format, with named leaves, branch lengths, and branch annotation. Typically, this string is the entire content of a file written by TreeAnnotator. :param float generation_time: The number of years per generation. :param str time_units: The units of time in which the species tree's branch lengths are measured. Allowed branch length units are millions of years, and years; these should be specified with the strings ``"myr"`` or ``"yr"``, respectively. This defaults to ``"myr"``. :return: A :class:`.Demography` instance that describing the information in the specified species tree. :rtype: msprime.Demography """ return species_trees.parse_starbeast( tree=tree, generation_time=generation_time, time_units=time_units, )
def _from_old_style_map_populations( population_configurations: list[PopulationConfiguration], migration_matrix: list[list[float]], demographic_events: list[DemographicEvent], population_map: [list[dict[int, str]]], ) -> Demography: direct_model = Demography._from_old_style_simple( population_configurations, migration_matrix, demographic_events ) for id_map in population_map: if len(set(id_map.values())) != len(id_map): raise ValueError("Population IDs in old model must be unique") for old_id in id_map.values(): if old_id < 0 or old_id >= direct_model.num_populations: raise ValueError( f"Bad population reference {old_id} in old style model" ) if len(population_map[0]) != len(population_configurations): raise ValueError( "The ID map for the first epoch must have entries for all " "populations in the old-style model" ) # Suppress misspecification warnings; we'll give more specific messages # later. with warnings.catch_warnings(): warnings.simplefilter("ignore") dbg = direct_model.debug() if dbg.num_epochs != len(population_map): raise ValueError( "Mismatch in the number of epochs in the old style model " f"({dbg.num_epochs}) and specified in the population ID map " f"({len(population_map)})" ) demography = Demography() # Set up the initial state of the model id_map = population_map[0] for new_name, old_id in id_map.items(): pc = population_configurations[old_id] demography._add_population_from_old_style(pc, new_name) for epoch_id_map in population_map[1:]: for name in epoch_id_map.keys(): if name not in demography: demography.add_population(name=name, initial_size=0) # Fill out migration matrix for pop_1, pop_2 in itertools.combinations(id_map.keys(), 2): for pop_j, pop_k in [(pop_1, pop_2), (pop_2, pop_1)]: j = id_map[pop_j] k = id_map[pop_k] demography.set_migration_rate( source=pop_j, dest=pop_k, rate=direct_model.migration_matrix[j, k] ) # Set the state of populations and migration rates epoch by epoch. for epoch in dbg.epochs[1:]: epoch_id_map = population_map[epoch.index] # Set the population sizes and growth_rates for the extant populations. for new_id, old_id in epoch_id_map.items(): pop_state = epoch.populations[old_id] demography.add_population_parameters_change( epoch.start_time, population=new_id, initial_size=pop_state.start_size, growth_rate=pop_state.growth_rate, ) # Set the migration rates. for pop_1, pop_2 in itertools.combinations(epoch_id_map.keys(), 2): for pop_j, pop_k in [(pop_1, pop_2), (pop_2, pop_1)]: j = epoch_id_map[pop_j] k = epoch_id_map[pop_k] demography.add_migration_rate_change( time=epoch.start_time, source=pop_j, dest=pop_k, rate=epoch.migration_matrix[j][k], ) # Add splits/admixtures/pulses according to the mass migrations in the # original model and ID maps. last_epoch_id_map = population_map[0] for epoch in dbg.epochs: logger.debug( f"Converting epoch[{epoch.index}] " f"{epoch.start_time:.2f}-{epoch.end_time:.2f}" ) # New name -> old ID epoch_id_map = population_map[epoch.index] # Old ID -> new name old_id_map = {value: key for key, value in epoch_id_map.items()} last_epoch_pops = set(last_epoch_id_map.keys()) epoch_pops = set(epoch_id_map.keys()) derived = set() events = copy.deepcopy(epoch.events) old_derived_ids = [] if last_epoch_pops != epoch_pops: ancestral = epoch_pops - last_epoch_pops derived = last_epoch_pops - epoch_pops old_derived_ids = [] if len(ancestral) == 1: logger.debug( f"Adding split ancestral={ancestral} derived={derived}" ) ancestral = ancestral.pop() demography.add_population_split( time=epoch.start_time, derived=list(derived), ancestral=ancestral, ) old_derived_ids = [last_epoch_id_map[pop] for pop in derived] derived_mass_migrations = 0 for event in events: if isinstance(event, MassMigration): if event.source in old_derived_ids: derived_mass_migrations += 1 if event.proportion != 1: raise ValueError( "MassMigration associated with population split " "with proportion != 1" ) # This check is weak, but it will catch some errors at least. # We're assuming the old model is well-formed, so this should # help catch errors where the id map is slightly off. if derived_mass_migrations < len(old_derived_ids) - 1: raise ValueError( "Insufficient MassMigrations found for population split" ) elif len(derived) == 1: derived = derived.pop() ancestral = [] sequential_proportions = [] old_derived_id = last_epoch_id_map[derived] for event in events: if isinstance(event, MassMigration): if event.source == old_derived_id: ancestral.append(old_id_map[event.dest]) sequential_proportions.append(event.proportion) proportions = _sequential_to_proportions(sequential_proportions) if math.isclose(sum(proportions), 1): if len(ancestral) == 1: # This is a single split from the ancestral population, # which continues to exist. We need to override the # defaults for the behaviour we want here. logger.debug( f"Adding single split ancestral={ancestral[0]} " f"derived={derived}" ) pop = demography[ancestral[0]] pop.initially_active = True pop.default_sampling_time = None demography.add_population_split( time=epoch.start_time, derived=[derived], ancestral=ancestral[0], ) else: logger.debug( f"Adding admixture ancestral={ancestral} " f"derived={derived} proportions={proportions}" ) demography.add_admixture( time=epoch.start_time, derived=derived, ancestral=ancestral, proportions=proportions, ) old_derived_ids = [old_derived_id] else: raise ValueError( "Admixture or single population split implied by ID map " "but absolute population proportions don't sum to 1" ) for event in events: if isinstance(event, MassMigration): if event.source not in old_derived_ids: demography.add_mass_migration( time=epoch.start_time, source=old_id_map[event.source], dest=old_id_map[event.dest], proportion=event.proportion, ) elif not isinstance( event, (MigrationRateChange, PopulationParametersChange) ): raise ValueError( "Only MassMigration, MigrationRateChange and " "PopulationParametersChange events are supported" ) # Go through the inactive populations and check the migration # rates make sense. It's an error to migrate out of an inactive # population and a warning to migrate out of inactive pops. pairs = itertools.combinations(range(len(epoch.populations)), 2) active = set(epoch_id_map.values()) for pop_1, pop_2 in pairs: for source, dest in [(pop_1, pop_2), (pop_2, pop_1)]: if epoch.migration_matrix[source, dest] != 0: if source in active and dest not in active: raise ValueError( "Non zero migration from an active population " f"({source}) to inactive ({dest})" ) if source not in active: warnings.warn( "Migration out of inactive population " f"({source}) to ({dest}). This may be an " "error in your model.", stacklevel=2, ) last_epoch_id_map = epoch_id_map demography.sort_events() return demography @staticmethod def _from_old_style_simple( population_configurations=None, migration_matrix=None, demographic_events=None, ) -> Demography: """ Creates a Demography object from the pre 1.0 style input parameters, reproducing the old semantics with respect to default values. """ demography = Demography() for pop_config in population_configurations: name = None if pop_config.metadata is not None: # If there's a name defined in the old-style metadata use that name = pop_config.metadata.get("name", None) demography._add_population_from_old_style(pop_config, name) if migration_matrix is not None: demography.migration_matrix = np.array(migration_matrix) if demographic_events is not None: for event in demographic_events: demography.add_event(copy.deepcopy(event)) return demography
[docs] @staticmethod def from_old_style( population_configurations=None, *, migration_matrix=None, demographic_events=None, Ne=1, ignore_sample_size=False, population_map: [list[dict[int, str | int]]] | None = None, ) -> Demography: """ Creates a Demography object from the pre 1.0 style input parameters, reproducing the old semantics with respect to default values. No sample information is stored in the new-style :class:`.Demography` objects, and therefore if the ``sample_size`` attribute of any of the input :class:`.PopulationConfiguration` objects is set a ValueError will be raised by default. However, if the ``ignore_sample_size`` parameter is set to True, this check will not be performed and the sample sizes specified in the old-style :class:`.PopulationConfiguration` objects will be ignored. Each :class:`.PopulationConfiguration` instance in the list of ``population_configurations`` corresponds to the equivalent :class:`.Population` object in the returned :class:`.Demography`. If a PopulationConfiguration has ``metadata`` defined and this dictionary contains a ``name`` field, this will be used as the :class:`.Population` name. Otherwise, the default population names will be used. Please see the :ref:`sec_ancestry_samples` section for details on how to specify sample locations in :func:`.sim_ancestry`. .. todo:: Document the remaining parameters. """ if population_configurations is None: pop_configs = [PopulationConfiguration(initial_size=Ne)] else: pop_configs = copy.deepcopy(population_configurations) for pop_config in pop_configs: if pop_config.initial_size is None: pop_config.initial_size = Ne if pop_config.sample_size is not None and not ignore_sample_size: raise ValueError( "You have specified a `sample_size` in a " "PopulationConfiguration object that is to be converted " "into a new-style Demography object, " "which does not contain any information about samples. " "Please use the ``samples`` argument to sim_ancestry " "instead, which provides flexible options for sampling " "from different populations" ) if population_map is None: return Demography._from_old_style_simple( pop_configs, migration_matrix, demographic_events ) else: return Demography._from_old_style_map_populations( pop_configs, migration_matrix, demographic_events, population_map, )
[docs] @staticmethod def from_demes(graph: demes.Graph) -> Demography: """ Creates a :class:`.Demography` object from the specified :ref:`demes graph <demes:sec_introduction>`. Time values in the demes graph may be specified in any units, but the returned object has units converted into generations. See the :ref:`sec_demography_importing_demes` section for details. .. code:: import demes graph = demes.load("model.yaml") demography = msprime.Demography.from_demes(graph) :param demes.Graph graph: A demes graph. :return: A :class:`.Demography` instance corresponding to the demes model. :rtype: msprime.Demography """ # Check for Demes features that we don't support. for deme in graph.demes: for epoch in deme.epochs: if epoch.selfing_rate != 0: raise ValueError( f"deme {deme.name}: non-zero selfing_rate not supported" ) if epoch.cloning_rate != 0: raise ValueError( f"deme {deme.name}: non-zero cloning_rate not supported" ) def get_growth_rate(epoch): ret = 0 if epoch.size_function not in ["constant", "exponential"]: raise ValueError( "msprime only supports constant or exponentially changing " "population sizes" ) if epoch.end_size != epoch.start_size: ret = -math.log(epoch.start_size / epoch.end_size) / epoch.time_span return ret graph = graph.in_generations() events = graph.discrete_demographic_events() demography = Demography() for deme in graph.demes: initial_size = 0 growth_rate = 0 last_epoch = deme.epochs[-1] initially_active = False to_activate = False if last_epoch.end_time == 0: initial_size = last_epoch.end_size growth_rate = get_growth_rate(last_epoch) initially_active = True else: # add activation if needed # needed if isn't the ancestor of any split or merger to_activate = True for split in events["splits"]: if deme.name == split.parent: to_activate = False for merger in events["mergers"]: if deme.name in merger.parents: to_activate = False demography.add_population( name=deme.name, description=deme.description, growth_rate=growth_rate, initial_size=initial_size, default_sampling_time=deme.end_time, initially_active=initially_active, ) if to_activate: demography._add_activate_population_event( time=deme.end_time, population=deme.name ) for epoch in reversed(deme.epochs): new_initial_size = None if initial_size != epoch.end_size: new_initial_size = epoch.end_size new_growth_rate = None alpha = get_growth_rate(epoch) if growth_rate != alpha: new_growth_rate = alpha if new_growth_rate is not None or new_initial_size is not None: demography.add_population_parameters_change( population=deme.name, time=epoch.end_time, initial_size=new_initial_size, growth_rate=new_growth_rate, ) initial_size = new_initial_size growth_rate = new_growth_rate for split in events.pop("splits"): demography.add_population_split( time=split.time, ancestral=split.parent, derived=split.children ) for branch in events.pop("branches"): demography.add_population_split( time=branch.time, ancestral=branch.parent, derived=[branch.child], ) # Add pulses in reverse order, so that pulses with the same time # correspond to the correct backwards-time mass migration ordering. for pulse in reversed(events.pop("pulses")): sequential_props = _proportions_to_sequential(pulse.proportions) for prop, source in zip(sequential_props, pulse.sources): demography.add_mass_migration( time=pulse.time, source=pulse.dest, dest=source, proportion=prop, ) for merger in events.pop("mergers"): demography.add_admixture( time=merger.time, derived=merger.child, ancestral=merger.parents, proportions=merger.proportions, ) for admixture in events.pop("admixtures"): demography.add_admixture( time=admixture.time, derived=admixture.child, ancestral=admixture.parents, proportions=admixture.proportions, ) assert len(events) == 0 # Turn migrations off at the start_time. We schedule all start_time # events first, and then all end_time events. This ensures that events # for migrations with an end_time that coincides with the start_time of # another migration will be scheduled later (backwards in time), # and thus override the rate=0 setting. for migration in graph.migrations: if not math.isinf(migration.start_time): demography.add_migration_rate_change( time=migration.start_time, source=migration.dest, dest=migration.source, rate=0, ) for migration in graph.migrations: if migration.end_time == 0: demography.set_migration_rate( source=migration.dest, dest=migration.source, rate=migration.rate ) else: demography.add_migration_rate_change( time=migration.end_time, source=migration.dest, dest=migration.source, rate=migration.rate, ) demography.sort_events() return demography
[docs] @staticmethod def from_tree_sequence( ts: tskit.TreeSequence, initial_size: float = 0 ) -> Demography: """ Creates a :class:`.Demography` object based on the information in the specified :class:`tskit.TreeSequence`. The returned demography will contain a population for each of the populations in the tree sequence, in the same order. The metadata for each population in the tree sequence will be inspected. If a schema is present and the metadata can be decoded, the ``name`` and ``description`` properties of populations are set if the corresponding keys are present. If the metadata cannot be decoded, the default values for ``name`` and ``description`` are used. The ``initial_size`` of each of the new populations is set to zero by default, and all other :class:`.Population` attributes are set to their default values. It is therefore essential to update the ``initial_size`` and ``growth_rate`` values to reflect the desired demography. .. seealso:: See the :ref:`initial state<sec_ancestry_initial_state_demography>` section for examples of how this method can be used. :param tskit.TreeSequence ts: The tree sequence to extract population information from. :param float initial_size: The default initial size for the newly added populations, as a number of individuals (Default=0). :return: A Demography object representing the populations in the specified tree sequence. :rtype: msprime.Demography """ demography = Demography() for population in ts.populations(): name = None description = None if isinstance(population.metadata, collections.abc.Mapping): # We have decoded metadata, extract some useful information # from it. name = population.metadata.get("name") description = population.metadata.get("description") demography.add_population( initial_size=initial_size, name=name, description=description ) return demography
[docs] @staticmethod def isolated_model(initial_size, *, growth_rate=None) -> Demography: """ Returns a :class:`.Demography` object representing a collection of isolated populations with specified initial population sizes and growth rates. Please see :ref:`sec_demography` for more details on population sizes and growth rates. :param list initial_size: the ``initial_size`` value for each of the :class:`.Population` in the returned model. The length of the array corresponds to the number of populations. model. :param list growth_rate: The exponential growth rate for each population. Must be either None (the default, resulting a zero growth rate) or an array with the same length as ``initial_size``. :return: A Demography object representing this model, suitable as input to :func:`.sim_ancestry`. :rtype: msprime.Demography """ initial_size = np.array(initial_size, dtype=np.float64) if len(initial_size.shape) != 1: raise ValueError( "The initial_size argument must a 1D array of population size values" ) if growth_rate is None: growth_rate = np.zeros_like(initial_size) else: growth_rate = np.array(growth_rate, dtype=np.float64) if initial_size.shape != growth_rate.shape: raise ValueError( "If growth_rate is specified it must be a 1D array of the same " "length as the population_size array" ) if np.any(initial_size < 0): raise ValueError("population size values must be nonnegative.") if not np.all(np.isfinite(initial_size)): raise ValueError("population size values must be finite.") if not np.all(np.isfinite(growth_rate)): raise ValueError("growth_rate values must be finite.") populations = [ Population( initial_size=initial_size[j], growth_rate=growth_rate[j], ) for j in range(len(initial_size)) ] return Demography(populations=populations)
[docs] @staticmethod def island_model(initial_size, migration_rate, *, growth_rate=None) -> Demography: """ Returns a :class:`.Demography` object representing a collection of populations with specified initial population sizes and growth rates, with symmetric migration between each pair of populations at the specified rate. Please see :ref:`sec_demography` for more details on population sizes and growth rates. :param list initial_size: the ``initial_size`` value for each of the :class:`.Population` in the returned model. The length of the array corresponds to the number of populations. model. :param float migration_rate: The migration rate between each pair of populations. :param list growth_rate: The exponential growth rate for each population. Must be either None (the default, resulting a zero growth rate) or an array with the same length as ``initial_size``. :return: A Demography object representing this model, suitable as input to :func:`.sim_ancestry`. :rtype: msprime.Demography """ model = Demography.isolated_model(initial_size, growth_rate=growth_rate) check_migration_rate(migration_rate) model.migration_matrix[:] = migration_rate np.fill_diagonal(model.migration_matrix, 0) return model
[docs] @staticmethod def stepping_stone_model( initial_size, migration_rate, *, growth_rate=None, boundaries=False ) -> Demography: """ Returns a :class:`.Demography` object representing a collection of populations with specified initial population sizes and growth rates, in which adjacent demes exchange migrants at the specified rate. Please see :ref:`sec_demography` for more details on population sizes and growth rates. .. note:: The current implementation only supports a one-dimensional stepping stone model, but higher dimensions could also be supported. Please open an issue on GitHub if this feature would be useful to you. :param list initial_size: the ``initial_size`` value for each of the :class:`.Population` in the returned model. The length of the array corresponds to the number of populations. :param float migration_rate: The migration rate between adjacent pairs of populations. :param list growth_rate: The exponential growth rate for each population. Must be either None (the default, resulting a zero growth rate) or an array with the same length as ``initial_size``. :param bool boundaries: If True the stepping stone model has boundary conditions imposed so that demes at either end of the chain do not exchange migrants. If False (the default), the set of populations is "circular" and migration takes place between the terminal demes. :return: A Demography object representing this model, suitable as input to :func:`.sim_ancestry`. :rtype: msprime.Demography """ initial_size = np.array(initial_size, dtype=np.float64) if len(initial_size.shape) > 1: raise ValueError( "Only 1D stepping stone models currently supported. Please open " "an issue on GitHub if you would like 2D (or more) models" ) model = Demography.isolated_model(initial_size, growth_rate=growth_rate) check_migration_rate(migration_rate) if model.num_populations > 1: index1 = np.arange(model.num_populations, dtype=int) index2 = np.mod(index1 + 1, model.num_populations) model.migration_matrix[index1, index2] = migration_rate model.migration_matrix[index2, index1] = migration_rate if boundaries: model.migration_matrix[0, -1] = 0 model.migration_matrix[-1, 0] = 0 return model
@staticmethod def _ooa_model(): """ Returns the Gutenkunst et al three population out-of-Africa model. This version is included here temporarily as a way to get some test coverage on the model compared with stdpopsim. Because we use this model in the documentation, we want make sure that it's doing what we think. We compare the model defined here then with the one presented in the docs, to ensure that no errors creep in. Once the upstream code in stdpopsim is updated to use msprime 1.0 APIs we can remove this model and instead compare directly to the stdpopsim model with .is_equivalent() or whatever. """ # Times are provided in years, so we convert into generations. generation_time = 25 T_OOA = 21.2e3 / generation_time T_AMH = 140e3 / generation_time T_ANC = 220e3 / generation_time # We need to work out the starting (diploid) population sizes based on # the growth rates provided for these two populations r_CEU = 0.004 r_CHB = 0.0055 N_CEU = 1000 / math.exp(-r_CEU * T_OOA) N_CHB = 510 / math.exp(-r_CHB * T_OOA) demography = Demography() demography.add_population( name="YRI", description="Yoruba in Ibadan, Nigeria", initial_size=12300, ) demography.add_population( name="CEU", description=( "Utah Residents (CEPH) with Northern and Western European Ancestry" ), initial_size=N_CEU, growth_rate=r_CEU, ) demography.add_population( name="CHB", description="Han Chinese in Beijing, China", initial_size=N_CHB, growth_rate=r_CHB, ) demography.add_population( name="OOA", description="Bottleneck out-of-Africa population", initial_size=2100, ) demography.add_population( name="AMH", description="Anatomically modern humans", initial_size=12300, ) demography.add_population( name="ANC", description="Ancestral equilibrium population", initial_size=7300, ) # Set the migration rates between extant populations demography.set_symmetric_migration_rate(["CEU", "CHB"], 9.6e-5) demography.set_symmetric_migration_rate(["YRI", "CHB"], 1.9e-5) demography.set_symmetric_migration_rate(["YRI", "CEU"], 3e-5) demography.add_population_split( time=T_OOA, derived=["CEU", "CHB"], ancestral="OOA" ) demography.add_symmetric_migration_rate_change( time=T_OOA, populations=["YRI", "OOA"], rate=25e-5 ) demography.add_population_split( time=T_AMH, derived=["YRI", "OOA"], ancestral="AMH" ) demography.add_population_split(time=T_ANC, derived=["AMH"], ancestral="ANC") return demography def _ooa_trunk_model(): """ Returns the Gutenkunst et al three population out-of-Africa model, rendered in a more "old-style" way where we merge the various populations back into Africa. This version is included here temporarily as a way to get some test coverage on the model compared with stdpopsim. Because we use this model in the documentation, we want make sure that it's doing what we think. We compare the model defined here then with the one presented in the docs, to ensure that no errors creep in. Once the upstream code in stdpopsim is updated to use msprime 1.0 APIs we can remove this model and instead compare directly to the stdpopsim model with .is_equivalent() or whatever. """ # Times are provided in years, so we convert into generations. generation_time = 25 T_OOA = 21.2e3 / generation_time T_AMH = 140e3 / generation_time T_ANC = 220e3 / generation_time # We need to work out the starting (diploid) population sizes based on # the growth rates provided for these two populations r_CEU = 0.004 r_CHB = 0.0055 N_CEU = 1000 / math.exp(-r_CEU * T_OOA) N_CHB = 510 / math.exp(-r_CHB * T_OOA) demography = Demography() # This is the "trunk" population that we merge other populations into demography.add_population( name="YRI", description="Africa", initial_size=12300, initially_active=True, ) demography.add_population( name="CEU", description="European", initial_size=N_CEU, growth_rate=r_CEU, ) demography.add_population( name="CHB", description="East Asian", initial_size=N_CHB, growth_rate=r_CHB, ) demography.add_population( name="OOA", description="Bottleneck out-of-Africa population", initial_size=2100, ) # Set the migration rates between extant populations demography.set_symmetric_migration_rate(["CEU", "CHB"], 9.6e-5) demography.set_symmetric_migration_rate(["YRI", "CHB"], 1.9e-5) demography.set_symmetric_migration_rate(["YRI", "CEU"], 3e-5) demography.add_population_split( time=T_OOA, derived=["CEU", "CHB"], ancestral="OOA" ) demography.add_symmetric_migration_rate_change( time=T_OOA, populations=["YRI", "OOA"], rate=25e-5 ) demography.add_population_split(time=T_AMH, derived=["OOA"], ancestral="YRI") demography.add_population_parameters_change( time=T_ANC, population="YRI", initial_size=7300 ) return demography @staticmethod def _ooa_archaic_model(): """ See notes for the _ooa model above. """ # Implement the OutOfAfricaArchaicAdmixture_5R19 model # NOTE: this example isn't very well factored and needs more work. # Times are provided in years, so we convert into generations. generation_time = 29 T_OOA = 36_000 / generation_time T_AMH = 60_700 / generation_time T_ANC = 300_000 / generation_time T_ArchaicAFR = 499_000 / generation_time T_Neanderthal = 559_000 / generation_time T_archaic_migration_start = 18_700 / generation_time T_archaic_migration_end = 125_000 / generation_time # We need to work out the starting (diploid) population sizes based on # the growth rates provided for these two populations r_CEU = 0.00125 r_CHB = 0.00372 N_CEU = 2300 / math.exp(-r_CEU * T_OOA) N_CHB = 650 / math.exp(-r_CHB * T_OOA) demography = Demography() # This is the "trunk" population that we merge other populations into demography.add_population( name="AFR", description="African population", initial_size=13900, initially_active=True, ) demography.add_population( name="CEU", description=( "Utah Residents (CEPH) with Northern and Western European Ancestry" ), initial_size=N_CEU, growth_rate=r_CEU, ) demography.add_population( name="CHB", description="Han Chinese in Beijing, China", initial_size=N_CHB, growth_rate=r_CHB, ) demography.add_population( name="Neanderthal", description="Putative Neanderthals", initial_size=3600, ) demography.add_population( name="ArchaicAFR", description="Putative Archaic Africans", initial_size=3600, ) demography.add_population( name="OOA", description="Bottleneck out-of-Africa population", initial_size=880, ) # Set the migration rates between extant populations demography.set_symmetric_migration_rate(["CEU", "CHB"], 11.3e-5) demography.set_symmetric_migration_rate(["AFR", "CEU"], 2.48e-5) demography.add_symmetric_migration_rate_change( T_archaic_migration_start, ["CEU", "Neanderthal"], 0.825e-5 ) demography.add_symmetric_migration_rate_change( T_archaic_migration_start, ["CHB", "Neanderthal"], 0.825e-5 ) demography.add_symmetric_migration_rate_change( T_archaic_migration_start, ["ArchaicAFR", "AFR"], 1.98e-5 ) demography.add_migration_rate_change(T_archaic_migration_end, rate=0) demography.add_population_split( time=T_OOA, derived=["CEU", "CHB"], ancestral="OOA" ) demography.add_symmetric_migration_rate_change( time=T_OOA, populations=["AFR", "OOA"], rate=52.2e-5 ) demography.add_symmetric_migration_rate_change( time=T_OOA, populations=["OOA", "Neanderthal"], rate=0.825e-5 ) demography.add_population_split(time=T_AMH, derived=["OOA"], ancestral="AFR") demography.add_symmetric_migration_rate_change( T_AMH, ["ArchaicAFR", "AFR"], 1.98e-5 ) demography.add_population_parameters_change( time=T_AMH, population="AFR", initial_size=13900 ) demography.add_population_parameters_change( time=T_ANC, population="AFR", initial_size=3600 ) demography.add_population_split( time=T_ArchaicAFR, derived=["ArchaicAFR"], ancestral="AFR" ) demography.add_population_split( time=T_Neanderthal, derived=["Neanderthal"], ancestral="AFR" ) demography.sort_events() return demography @staticmethod def _american_admixture_model(): # Implementation of AmericanAdmixture_4B11 model. See notes from the _ooa # model above as to why this is here. T_OOA = 920 N_EUR = 34039 r_EUR = 0.0038 N_EAS = 45852 r_EAS = 0.0048 T_ADMIX = 12 N_ADMIX = 54664 r_ADMIX = 0.05 demography = Demography() demography.add_population(name="AFR", description="African", initial_size=14474) demography.add_population( name="EUR", description="European", initial_size=N_EUR, growth_rate=r_EUR, ) demography.add_population( name="EAS", description="East Asian", initial_size=N_EAS, growth_rate=r_EAS, ) demography.add_population( name="ADMIX", description="Admixed America", initial_size=N_ADMIX, growth_rate=r_ADMIX, ) demography.add_admixture( T_ADMIX, derived="ADMIX", ancestral=["AFR", "EUR", "EAS"], proportions=[1 / 6, 2 / 6, 3 / 6], ) demography.add_population( name="OOA", description="Bottleneck out-of-Africa", initial_size=1861, ) demography.add_population( name="AMH", description="Anatomically modern humans", initial_size=14474, ) demography.add_population( name="ANC", description="Ancestral equilibrium", initial_size=7310, ) demography.set_symmetric_migration_rate(["AFR", "EUR"], 2.5e-5) demography.set_symmetric_migration_rate(["AFR", "EAS"], 0.78e-5) demography.set_symmetric_migration_rate(["EUR", "EAS"], 3.11e-5) demography.add_population_split(T_OOA, derived=["EUR", "EAS"], ancestral="OOA") demography.add_symmetric_migration_rate_change( time=T_OOA, populations=["AFR", "OOA"], rate=15e-5 ) demography.add_population_split(2040, derived=["OOA", "AFR"], ancestral="AMH") demography.add_population_split(5920, derived=["AMH"], ancestral="ANC") return demography
[docs] def to_demes(self) -> demes.Graph: """ Creates a :class:`demes.Graph` object from the demography. See the :ref:`sec_demography_importing_demes` section for details. .. note:: Demographic models using bottlenecks added via the :meth:`add_simple_bottleneck` or :meth:`add_instantaneous_bottleneck` methods are not able to be converted into a demes graph. .. note:: Demes is stricter than msprime with regard to how a demographic model is structured, so models that can be simulated with msprime are not guaranteed to be convertible to a demes graph. In particular, msprime's legacy API permits setting migrations or other attributes for a population even after that population has been merged into an ancestor. Such models are rarely constructed deliberately, so an error during conversion of a legacy model could indicate model misspecification. The returned graph can be saved as a `Demes-format <https://popsim-consortium.github.io/demes-spec-docs/main/>`_ YAML file using the :ref:`demes <demes:sec_introduction>` API. .. code:: import demes demography = msprime.Demography.island_model([1000] * 3, 1e-5) graph = demography.to_demes() demes.dump(graph, "island_model.yaml") Or plotted using the `demesdraw <https://github.com/grahamgower/demesdraw>`_ visualisation package. .. code:: import demesdraw demography = msprime.Demography.island_model([1000] * 3, 1e-5) graph = demography.to_demes() ax = demesdraw.tubes(graph) ax.figure.savefig("island_model.pdf") :return: A :class:`demes.Graph` object corresponding to the demography. :rtype: demes.Graph """ dbg = self.debug() resolved = dbg.demography b = demes.Builder() for pop in resolved.populations: start_time = max( epoch.end_time for epoch in dbg.epochs if epoch.populations[pop.id].state == PopulationStateMachine.ACTIVE ) end_time = min( epoch.start_time for epoch in dbg.epochs if epoch.populations[pop.id].state == PopulationStateMachine.ACTIVE ) assert start_time > end_time initial_epoch = dict( end_size=pop.initial_size, growth_rate=pop.growth_rate, end_time=end_time, ) b.add_deme( pop.name, description=pop.description if pop.description else None, start_time=start_time, epochs=[initial_epoch], ) deme_map = {pop.name: b.data["demes"][pop.id] for pop in resolved.populations} # Copied from demes/ms.py def epoch_resolve(deme, time): """ Return the oldest epoch if it has end_time == time. If not, create a new oldest epoch with end_time=time. Also resolve sizes by dealing with the growth_rate attribute (if required). """ epoch = deme["epochs"][0] start_time = deme["start_time"] end_time = epoch["end_time"] if not (start_time > time >= end_time): raise ValueError( f"time {time} outside {deme['name']}'s existence interval " f"(start_time={start_time}, end_time={end_time}]" ) if time > end_time: new_epoch = copy.deepcopy(epoch) # find size at given time growth_rate = epoch.pop("growth_rate", 0) dt = time - epoch["end_time"] size_at_t = epoch["end_size"] * math.exp(-growth_rate * dt) epoch["start_size"] = size_at_t new_epoch["end_size"] = size_at_t new_epoch["end_time"] = time deme["epochs"].insert(0, new_epoch) epoch = new_epoch return epoch for time, events_group in itertools.groupby( resolved.events, operator.attrgetter("time") ): events_group_list = list(events_group) lineage_movements = [] for event in events_group_list: if isinstance(event, LineageMovementEvent): # Collect these so that we can later group them by source. lineage_movements.extend(event._as_lineage_movements()) elif isinstance(event, PopulationParametersChange): if event.population == -1: pids = range(len(resolved.populations)) else: pids = [event.population] for pid in pids: deme = deme_map[resolved[pid].name] if ( len(deme["epochs"]) == 1 and deme["epochs"][0]["end_size"] == 0 and event.initial_size is not None and event.initial_size > 0 ): # The population size was 0 at time 0, but is now # positive, indicating population extinction at the # current time. deme["epochs"][0]["end_time"] = time epoch = epoch_resolve(deme, time) if event.growth_rate is not None: epoch["growth_rate"] = event.growth_rate if event.initial_size is not None: epoch["end_size"] = event.initial_size elif isinstance(event, StateChangeEvent): raise ValueError(f"Cannot convert {event} to demes model.") # Lineage movement events correspond to either the source deme's # creation, or otherwise just pulses into the source. # We distinguish these cases based on the total ancestry # proportion for the source deme at this time. lm_by_source = collections.defaultdict(list) for lm in lineage_movements: source = resolved[lm.source].name lm_by_source[source].append(lm) pulses = set() for source, lm_group in lm_by_source.items(): ancestors = [resolved[lm.dest].name for lm in lm_group] proportions = _sequential_to_proportions( [lm.proportion for lm in lm_group] ) if math.isclose(sum(proportions), 1): # Source deme is created from the ancestors. deme = deme_map[source] deme.update( ancestors=ancestors, proportions=proportions, start_time=time ) for ancestor in ancestors: if not resolved[ancestor].initially_active: anc_deme = deme_map[ancestor] anc_deme["epochs"][-1]["end_time"] = time else: # Source deme receives pulses from the ancestors. pulses.update( (lm.source, lm.dest, lm.proportion) for lm in lm_group ) # The order of pulses matters when multiple pulses occur at the # same time, so we must be careful to add the pulses in the same # order as the lineage movements were specified. The pulses list # is later reversed, to correspond with the forwards-time ordering. for lm in lineage_movements: if (lm.source, lm.dest, lm.proportion) in pulses: b.add_pulse( sources=[resolved[lm.dest].name], dest=resolved[lm.source].name, proportions=[lm.proportion], time=time, ) # Resolve/remove growth_rate in oldest epochs. for deme in b.data["demes"]: start_time = deme.get("start_time", math.inf) epoch = deme["epochs"][0] growth_rate = epoch.pop("growth_rate", 0) if growth_rate != 0: if math.isinf(start_time): raise ValueError( f"{deme['name']}: growth rate for infinite-length " "epoch is invalid" ) dt = start_time - epoch["end_time"] epoch["start_size"] = epoch["end_size"] * math.exp(-dt * growth_rate) else: epoch["start_size"] = epoch["end_size"] # Copied from demes/ms.py def migrations_from_mm_list( mm_list: list[Any], end_times: list[float], deme_names: list[str] ) -> list[MutableMapping]: """ Convert a list of migration matrices into a list of migration dicts. """ assert len(mm_list) == len(end_times) migrations: list[MutableMapping] = [] current: dict[tuple[int, int], MutableMapping] = dict() start_time = math.inf for migration_matrix, end_time in zip(mm_list, end_times): n = len(migration_matrix) assert n == len(deme_names) for j in range(n): assert n == len(migration_matrix[j]) for k in range(n): if j == k: continue rate = migration_matrix[j][k] migration_dict = current.get((j, k)) if migration_dict is None: if rate != 0: migration_dict = dict( source=deme_names[j], dest=deme_names[k], start_time=start_time, end_time=end_time, rate=rate, ) current[(j, k)] = migration_dict migrations.append(migration_dict) else: if rate == 0: del current[(j, k)] elif migration_dict["rate"] == rate: # extend migration_dict migration_dict["end_time"] = end_time else: migration_dict = dict( source=deme_names[j], dest=deme_names[k], start_time=start_time, end_time=end_time, rate=rate, ) current[(j, k)] = migration_dict migrations.append(migration_dict) start_time = end_time return migrations mm_list = [epoch.migration_matrix.T for epoch in reversed(dbg.epochs)] mm_end_times = [epoch.start_time for epoch in reversed(dbg.epochs)] migrations = migrations_from_mm_list(mm_list, mm_end_times, list(resolved)) b.data["migrations"] = migrations # Reverse the order of pulses, so that older pulses come first. # This also fixes the order of pulses that occur simultaneously, # so that realised ancestry proportions match the msprime model. if "pulses" in b.data: b.data["pulses"].reverse() # Sort demes by their start time (so that ancestors come before descendants). b.data["demes"] = sorted( b.data["demes"], key=operator.itemgetter("start_time"), reverse=True ) return b.resolve()
# This was lifted out of older code as-is. No point in updating it # to use dataclasses, since all we want to do is maintain compatibility # with older code.
[docs]class PopulationConfiguration: """ The initial configuration of a population (or deme) in a simulation. .. important:: This class is deprecated (but supported indefinitely); please use the msprime 1.0 :ref:`demography API<sec_demography>` in new code. :param int sample_size: The number of initial samples that are drawn from this population. :param float initial_size: The absolute size of the population at time zero. Defaults to the reference population size :math:`N_e`. :param float growth_rate: The forwards-time exponential growth rate of the population per generation. Growth rates can be negative. This is zero for a constant population size, and positive for a population that has been growing. Defaults to 0. :param dict metadata: A JSON-encodable dictionary of metadata to associate with the corresponding Population in the output tree sequence. If not specified or None, no metadata is stored (i.e., an empty bytes array). Note that this metadata is ignored when using the ``from_ts`` argument to :func:`simulate`, as the population definitions in the tree sequence that is used as the starting point take precedence. """ def __init__( self, sample_size=None, initial_size=None, growth_rate=0.0, metadata=None ): if initial_size is not None and initial_size < 0: raise ValueError("Population size must be >= 0") if sample_size is not None and sample_size < 0: raise ValueError("Sample size must be >= 0") self.sample_size = sample_size self.initial_size = initial_size self.growth_rate = growth_rate self.metadata = metadata def asdict(self): return dict( sample_size=self.sample_size, initial_size=self.initial_size, growth_rate=self.growth_rate, metadata=self.metadata, )
def _list_str(a: list, fmt=None): """ Returns the specified items rendered as a string without quotes. """ if fmt is None: joined = ", ".join(str(item) for item in a) else: joined = ", ".join(f"{item:{fmt}}" for item in a) return f"[{joined}]" @dataclasses.dataclass class DemographicEvent: """ Superclass of demographic events that occur during simulations. """ time: float demography: Demography = dataclasses.field( init=False, compare=False, default=None, repr=False ) def _parameters(self): raise NotImplementedError() def _effect(self): raise NotImplementedError() def asdict(self): return { key: getattr(self, key) for key in inspect.signature(self.__init__).parameters.keys() if hasattr(self, key) } def _convert_id(self, population_ref): """ Converts the specified population reference into an integer, suitable for input into the low-level code. We treat -1 as a special case because it's used as meaning "all populations" by the events. """ if population_ref in [-1, None]: # Both of these mean "all populations" return -1 if self.demography is None: # We need to be able to handle Events that are not associated with # a Demography to support old code. However, these should only ever # happen with integer IDs. if not core.isinteger(population_ref): raise ValueError( "Working with demographic events not associated with a " "Demography object is a legacy-only operation. Population " "references must be integer IDs" ) return population_ref return self.demography[population_ref].id class ParameterChangeEvent(DemographicEvent): """ Superclass of events that change some parameters in the underlying simulation model but don't actually affect the state in any other way. """
[docs]@dataclasses.dataclass class PopulationParametersChange(ParameterChangeEvent): """ Changes the demographic parameters of a population at a given time. This event generalises the ``-eg``, ``-eG``, ``-en`` and ``-eN`` options from ``ms``. Note that unlike ``ms`` we do not automatically set growth rates to zero when the population size is changed. .. important:: This class is deprecated (but supported indefinitely); please use the :meth:`.Demography.add_population_parameters_change` method in new code. :param float time: The length of time ago at which this event occurred. :param float initial_size: The absolute diploid size of the population at the beginning of the time slice starting at ``time``. If None, this is calculated according to the initial population size and growth rate over the preceding time slice. :param float growth_rate: The new per-generation growth rate. If None, the growth rate is not changed. Defaults to None. :param int population: The ID of the population affected. If ``population`` is None, the changes affect all populations simultaneously. """ initial_size: float | None = None growth_rate: float | None = None # TODO change the default to -1 to match MigrationRateChange. population: int | None = None # Deprecated. # TODO add a formal deprecation notice population_id: int | None = dataclasses.field(default=None, repr=False) _type_str: ClassVar[str] = "Population parameter change" def __post_init__(self): if self.population_id is not None and self.population is not None: raise ValueError( "population_id and population are aliases; cannot supply both." ) if self.population_id is not None: self.population = self.population_id if self.growth_rate is None and self.initial_size is None: raise ValueError("Must specify one or more of growth_rate and initial_size") if self.initial_size is not None and self.initial_size < 0: raise ValueError("Cannot have a population size < 0") self.population = -1 if self.population is None else self.population def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.2 is out # https://github.com/tskit-dev/msprime/issues/1037 ret = { "type": "population_parameters_change", "time": self.time, "population": self._convert_id(self.population), } if self.growth_rate is not None: ret["growth_rate"] = self.growth_rate if self.initial_size is not None: ret["initial_size"] = self.initial_size return ret def _parameters(self): s = f"population={self.population}, " if self.initial_size is not None: s += f"initial_size={self.initial_size}, " if self.growth_rate is not None: s += f"growth_rate={self.growth_rate}, " return s[:-2] def _effect(self): s = "" if self.initial_size is not None: s += f"initial_size → {self.initial_size:.2g} " if self.growth_rate is not None: s += "and " if self.growth_rate is not None: s += f"growth_rate → {self.growth_rate:.3g} " s += "for" if self.population == -1: s += " all populations" else: s += f" population {self.population}" return s
[docs]@dataclasses.dataclass class MigrationRateChange(ParameterChangeEvent): """ Changes the rate of migration from one deme to another to a new value at a specific time. Migration rates are specified in terms of the rate at which lineages move from population ``source`` to ``dest`` during the progress of the simulation. Note that ``source`` and ``dest`` are from the perspective of the coalescent process; please see the :ref:`sec_ancestry_models` section for more details on the interpretation of this migration model. By default, ``source=-1`` and ``dest=-1``, which results in all non-diagonal elements of the migration matrix being changed to the new rate. If ``source`` and ``dest`` are specified, they must refer to valid population IDs. .. important:: This class is deprecated (but supported indefinitely); please use the :meth:`.Demography.add_migration_rate_change` method in new code. :param float time: The time at which this event occurs in generations. :param float rate: The new per-generation migration rate. :param int source: The ID of the source population. :param int dest: The ID of the destination population. """ rate: float source: int = -1 dest: int = -1 # Deprecated. # TODO add a formal deprecation notice matrix_index: tuple | None = dataclasses.field(default=None, repr=False) _type_str: ClassVar[str] = "Migration rate change" def __post_init__(self): # If the deprecated form is used, it overwrites the values of source # and dest if self.matrix_index is not None: self.source = self.matrix_index[0] self.dest = self.matrix_index[1] def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "migration_rate_change", "time": self.time, # Note: We'd like to change the name here to "rate" but it's best # to leave this alone until stdpopsim has been moved away from # using this internal API. "migration_rate": self.rate, "source": self._convert_id(self.source), "dest": self._convert_id(self.dest), } def _parameters(self): return f"source={self.source}, dest={self.dest}, rate={self.rate}" def _effect(self): ret = "Backwards-time migration rate " if self.source == -1 and self.dest == -1: ret += "for all populations " else: ret += f"from {self.source} to {self.dest} " ret += f"→ {self.rate}" return ret
@dataclasses.dataclass class SymmetricMigrationRateChange(ParameterChangeEvent): """ Class representing a SymmetricMigrationRateChange. Not part of the external API as it was added after 1.0. """ populations: list[int | str] rate: float _type_str: ClassVar[str] = dataclasses.field( default="Symmetric migration rate change", repr=False ) def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "symmetric_migration_rate_change", "time": self.time, "populations": [self._convert_id(pop) for pop in self.populations], "rate": self.rate, } def _parameters(self): return f"populations={_list_str(self.populations)}, rate={self.rate}" def _effect(self): s = "Sets the symmetric migration rate between " if len(self.populations) == 2: s += f"{self.populations[0]} and {self.populations[1]} " else: s += f"all pairs of populations in {_list_str(self.populations)} " s += f"to {self.rate} per generation" return s @dataclasses.dataclass class LineageMovement: """ A single instantaneous movement of lineages from one population to another. Note that 'source' and 'dest' are in the backwards-in-time sense. """ source: int dest: int proportion: float class LineageMovementEvent(DemographicEvent): """ Superclass of events that move lineages around between populations. """ def _as_lineage_movements(self) -> list[LineageMovement]: """ Returns the equivalent of this lineage movement event as a list of lineage movements. """ raise NotImplementedError()
[docs]@dataclasses.dataclass class MassMigration(LineageMovementEvent): """ A mass migration event in which some fraction of the population in one deme (the ``source``) simultaneously move to another deme (``dest``) during the progress of the simulation. Each lineage currently present in the source population moves to the destination population with probability equal to ``proportion``. Note that ``source`` and ``dest`` are from the perspective of the coalescent process; please see the :ref:`sec_ancestry_models` section for more details on the interpretation of this migration model. This event class generalises the population split (``-ej``) and admixture (``-es``) events from ``ms``. Note that MassMigrations do *not* have any side effects on the migration matrix. .. important:: This class is deprecated (but supported indefinitely); please use the :meth:`.Demography.add_mass_migration` method in new code. In addition, please see the new higher-level :ref:`sec_demography_events_population_split` and :ref:`sec_demography_events_admixture` events. :param float time: The time at which this event occurs in generations. :param int source: The ID of the source population. :param int dest: The ID of the destination population. :param float proportion: The probability that any given lineage within the source population migrates to the destination population. """ source: int # dest only has a default because of the deprecated destination attr. dest: None | int = None proportion: float = 1.0 # Deprecated. # TODO add a formal deprecation notice destination: int | None = dataclasses.field(default=None, repr=False) _type_str: ClassVar[str] = dataclasses.field(default="Mass Migration", repr=False) def __post_init__(self): if self.dest is not None and self.destination is not None: raise ValueError("dest and destination are aliases; cannot supply both") if self.destination is not None: self.dest = self.destination def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "mass_migration", "time": self.time, "source": self._convert_id(self.source), "dest": self._convert_id(self.dest), "proportion": self.proportion, } def _parameters(self): return ( f"source={self.source}, dest={self.dest}, proportion={self.proportion:.3g}" ) def _effect(self): if self.proportion == 1.0: ret = ( f"All lineages currently in population {self.source} move " f"to {self.dest} " ) else: ret = ( f"Lineages currently in population {self.source} move to {self.dest} " f"with probability {self.proportion:.3g} " ) ret += ( "(equivalent to individuals " f"migrating from {self.dest} to {self.source} forwards in time)" ) return ret def _as_lineage_movements(self): return [ LineageMovement( source=self._convert_id(self.source), dest=self._convert_id(self.dest), proportion=self.proportion, ) ]
@dataclasses.dataclass class ActivatePopulationEvent(DemographicEvent): """ A population activation event, which changes the state of the population from inactive to active at the given time. """ population: int | str | None = None """ The name or ID of the population to activate. """ _type_str: ClassVar[str] = dataclasses.field( default="Activate population event", repr=False ) def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "activate_population_event", "time": self.time, "population": self._convert_id(self.population), } def _parameters(self): return f"population={self.population}" def _effect(self): s = f"Activates population {self.population}" return s @dataclasses.dataclass class PopulationSplit(LineageMovementEvent): """ :param float time: The time at which this event occurs in generations. :param list(int) derived: The ID(s) of the derived population(s). :param int ancestral: The ID of the ancestral population. """ derived: list[int | str] ancestral: int | str _type_str: ClassVar[str] = dataclasses.field(default="Population Split", repr=False) def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "population_split", "time": self.time, "derived": [self._convert_id(pop) for pop in self.derived], "ancestral": self._convert_id(self.ancestral), } def _parameters(self): return f"derived={_list_str(self.derived)}, ancestral={self.ancestral}" def _effect(self): s = "Moves all lineages from " if len(self.derived) == 1: s += f"the '{self.derived[0]}' derived population " else: s += "derived populations " if len(self.derived) == 2: s += f"'{self.derived[0]}' and '{self.derived[1]}' " else: s += f"{_list_str(self.derived)} " s += f"to the ancestral '{self.ancestral}' population. " s += "Also set " if len(self.derived) == 1: s += f"'{self.derived[0]}' " else: s += "the derived populations " s += ( "to inactive, and all migration rates to and from " f"the derived population{'s' if len(self.derived) > 1 else ''} " "to zero." ) return s def _as_lineage_movements(self): ancestral = self._convert_id(self.ancestral) return [ LineageMovement(source=self._convert_id(pop), dest=ancestral, proportion=1) for pop in self.derived ] @dataclasses.dataclass class Admixture(LineageMovementEvent): """ :param float time: The time at which this event occurs in generations. """ derived: int | str ancestral: list[int | str] proportions: list[float] _type_str: ClassVar[str] = dataclasses.field(default="Admixture", repr=False) @property def num_ancestral(self): return len(self.ancestral) def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "admixture", "time": self.time, "derived": self._convert_id(self.derived), "ancestral": [self._convert_id(pop) for pop in self.ancestral], "proportions": self.proportions, } def _parameters(self): return ( f"derived={self.derived} ancestral={_list_str(self.ancestral)} " f"proportions={_list_str(self.proportions, '.2f')}" ) def _effect(self): move_to = "; ".join( f"'{pop}' with proba {proba:.3g}" for pop, proba in zip(self.ancestral, self.proportions) ) return ( f"Moves all lineages from admixed population '{self.derived}' " f"to ancestral population{'s' if len(self.ancestral) > 1 else ''}. " f"Lineages move to {move_to}. Set '{self.derived}' to inactive, " f"and all migration rates to and from '{self.derived}' to zero." ) def _as_lineage_movements(self): derived = self._convert_id(self.derived) ancestral = [self._convert_id(pop) for pop in self.ancestral] # Conditioned on having already distributed a fraction q of the # lineages, the we need a fraction p / (1 - q) of the remaining # lineages to get an overall proportion of p. S = _proportions_to_sequential(self.proportions) return [ LineageMovement(source=derived, dest=ancestral[j], proportion=S[j]) for j in range(self.num_ancestral) ] class StateChangeEvent(DemographicEvent): """ Superclass of events that change the state of the simulation in complex ways. """ # This is an unsupported/undocumented demographic event. @dataclasses.dataclass class SimpleBottleneck(StateChangeEvent): population: int proportion: float = 1.0 def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "simple_bottleneck", "time": self.time, "population": self._convert_id(self.population), "proportion": self.proportion, } _type_str: ClassVar[str] = dataclasses.field( default="Simple Bottleneck", repr=False ) def _parameters(self): return f"population={self.population}, proportion={self.proportion}" def _effect(self): return ( f"Lineages in population {self.population} coalesce with " f"probability {self.proportion}" ) # TODO document @dataclasses.dataclass class InstantaneousBottleneck(StateChangeEvent): population: int strength: float = 1.0 def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "instantaneous_bottleneck", "time": self.time, "population": self._convert_id(self.population), "strength": self.strength, } _type_str: ClassVar[str] = dataclasses.field( default="Instantaneous Bottleneck", repr=False ) def _parameters(self): return f"population={self.population}, strength={self.strength}" def _effect(self): return f"Equivalent to {self.strength} generations of the coalescent"
[docs]@dataclasses.dataclass class CensusEvent(DemographicEvent): """ An event that adds a node to each branch of every tree at a given time during the simulation. This may be used to record all ancestral haplotypes present at that time, and to extract other information related to these haplotypes: for instance to trace the local ancestry of a sample back to a set of contemporaneous ancestors, or to assess whether a subset of samples has coalesced more recently than the census time. See :ref:`sec_ancestry_census_events` for more details. .. important:: This class is deprecated (but supported indefinitely); please use the :meth:`.Demography.add_census` method in new code. :param float time: The time at which this event occurs in generations. """ _type_str: ClassVar[str] = dataclasses.field(default="Census", repr=False) def get_ll_representation(self, num_populations=None): # We need to keep the num_populations argument until stdpopsim 0.1 is out # https://github.com/tskit-dev/msprime/issues/1037 return { "type": "census_event", "time": self.time, } def _parameters(self): return "" def _effect(self): return "Insert census nodes to record the location of all lineages"
def _sequential_to_proportions(S): """ Given a list of sequential lineage proportions out of a population, return the absolute proportions of the original population this corresponds to. """ P = [] for j in range(len(S)): P.append(S[j] * (1 - sum(P[:j]))) return P def _proportions_to_sequential(P): """ Given a list of absolute proportions of lineages moving out of a population, return the sequential conditional movements required to give them same proportions. """ # Conditioned on having already distributed a fraction q of the # lineages, the we need a fraction p / (1 - q) of the remaining # lineages to get an overall proportion of p C = [0 for _ in P] for j in range(len(P)): s = sum(P[:j]) if s < 1: C[j] = P[j] / (1 - s) return C def _matrix_exponential(A, n=5): """ Returns the matrix exponential of A. Only works if the offdiagonals of A are nonnegative and the row sums of A are less than or equal to zero. Melloy & Bennett (1993), https://doi.org/10.1016/0377-0427(93)90036-B """ assert np.max(np.diag(A)) <= 0 assert np.min(np.tril(A, k=-1)) == 0 assert np.min(np.triu(A, k=1)) == 0 assert np.max(np.sum(A, 1)) <= 1e-10 # for floating-point error dA = (-1) * np.diag(A) dmax = np.max(dA) expA = np.eye(A.shape[0]) if dmax > 0: P = A / dmax np.fill_diagonal(P, 1 - dA / dmax) # nscales is the number of scaling-and-squaring steps: # we compute exp(tA) and then square the result nscales times nscales = int(max(0, np.ceil(np.log2(dmax) - np.log2(0.2)))) t = dmax / (2**nscales) # Now, exp(tA) = exp(-t) * (I + tP + (tP)^2 / 2 + ...) # and the k-th term in this sum is Pk=(tP/k times the previous) Pk = np.eye(A.shape[0]) for k in range(1, n + 1): Pk = (t / k) * np.matmul(P, Pk) expA = expA + Pk expA *= np.exp(-t) expA = np.linalg.matrix_power(expA, 2**nscales) return expA class PopulationStateMachine(enum.IntEnum): """ During a simulation each population has three possible states described by this state machine. In general, a population follows: INACTIVE -> ACTIVE -> PREVIOUSLY_ACTIVE All populations are by default ACTIVE at the start of the simulation, except if they are they are "ancestral" in a population split event. In this case populations are initially INACTIVE. A population then transitions from INACTIVE -> ACTIVE when the corresponding population split event occurs. Populations transition from ACTIVE -> PREVIOUSLY_ACTIVE when they are "derived" in either population split or admixture events. No other transitions are possible. """ INACTIVE = 0 ACTIVE = 1 PREVIOUSLY_ACTIVE = 2 @dataclasses.dataclass class PopulationState: """ Simple class to represent the state of a population in terms of its demographic parameters. Note: start and end here refer to time flowing *backwards*! """ id: int # noqa: A003 name: str start_size: float end_size: float growth_rate: float state: int @property def active(self): return self.state == PopulationStateMachine.ACTIVE @dataclasses.dataclass class Epoch: """ Represents a single epoch in the simulation within which the state of the demographic parameters are constant. """ index: int start_time: float end_time: float populations: list[PopulationState] migration_matrix: list # TODO numpy array events: list[DemographicEvent] def _title_text(self): return ( f"Epoch[{self.index}]: " f"[{self.start_time:.3g}, {self.end_time:.3g}) generations" ) def _population_state_text(self): return ( f"Populations " f"(total={len(self.populations)} active={self.num_active_populations})" ) @property def demographic_events(self): # For compatibility with msprime 0.x return self.events @property def active_populations(self): return [pop for pop in self.populations if pop.active] @property def num_active_populations(self): return len(self.active_populations)
[docs]class DemographyDebugger: """ Utilities to compute and display information about the state of populations during the different simulation epochs defined by demographic events. .. warning:: This class is not intended to be instantiated directly using the constructor - please use :meth:`.Demography.debug()` to obtain a DemographyDebugger for a given :class:`.Demography` instead. """ def __init__( self, # Deprecated pre-1.0 parameters. Ne=1, population_configurations=None, migration_matrix=None, demographic_events=None, model=None, *, demography=None, ): if demography is None: # Support the pre-1.0 syntax demography = Demography.from_old_style( population_configurations, migration_matrix=migration_matrix, demographic_events=demographic_events, Ne=Ne, ignore_sample_size=True, ) self.demography = demography.validate() self.num_populations = demography.num_populations self._make_epochs() self._check_misspecification() def _make_epochs(self): self.epochs = [] simulator = ancestry._parse_sim_ancestry( demography=self.demography, init_for_debugger=True ) start_time = 0 end_time = 0 abs_tol = 1e-9 event_index = 0 all_events = self.demography.events while not math.isinf(end_time): events = [] while event_index < len(all_events) and math.isclose( all_events[event_index].time, start_time, abs_tol=abs_tol ): events.append(all_events[event_index]) event_index += 1 end_time = simulator.debug_demography() migration_matrix = simulator.migration_matrix pop_conf = simulator.population_configuration populations = [ PopulationState( id=j, name=self.demography.populations[j].name, start_size=simulator.compute_population_size(j, start_time), end_size=simulator.compute_population_size(j, end_time), growth_rate=pop_conf[j]["growth_rate"], state=PopulationStateMachine(pop_conf[j]["state"]), ) for j in range(self.num_populations) ] epoch_index = len(self.epochs) self.epochs.append( Epoch( epoch_index, start_time, end_time, populations, migration_matrix, events, ) ) start_time = end_time def _check_misspecification(self): """ Check for things that might indicate model misspecification. """ merged_pops = set() for epoch in self.epochs: for de in epoch.events: if isinstance(de, MassMigration) and de.proportion == 1: merged_pops.add(de.source) mm = epoch.migration_matrix for pop_k in merged_pops: k = self.demography[pop_k].id if any(mm[k, :] != 0) or any(mm[:, k] != 0): warnings.warn( "Non-zero migration rates exist after merging " f"population {k}. This almost certainly indicates " "demographic misspecification.", stacklevel=2, ) def _populations_table(self, epoch, as_text=True): active_populations = epoch.active_populations column_titles = ["", "start", "end", "growth_rate"] if len(active_populations) > 1: column_titles += [pop.name for pop in active_populations] data = [] for pop in active_populations: row = [ pop.name, f"{pop.start_size: .1f}", f"{pop.end_size: .1f}", f"{pop.growth_rate: .3g}", ] if len(active_populations) > 1: for other_pop in active_populations: item = self.demography._migration_rate_info( pop.id, other_pop.id, epoch.migration_matrix[pop.id, other_pop.id], ) if as_text: row.append(item.as_text()) else: row.append(item) data.append(row) return column_titles, data def _populations_html(self, epoch): column_titles, data = self._populations_table(epoch, as_text=False) return core.html_table(epoch._population_state_text(), column_titles, data) def _populations_text(self, epoch): column_titles, data = self._populations_table(epoch) alignments = ">>><" if epoch.num_active_populations > 1: alignments += "^" * epoch.num_active_populations # Repack the table items as lists column_titles = [[x] for x in column_titles] data = [[[x] for x in row] for row in data] return core.text_table( epoch._population_state_text(), column_titles, alignments, data ) def _repr_html_(self): out = "" for epoch in self.epochs: if epoch.index > 0: assert len(epoch.events) > 0 title = f"Events @ generation {epoch.start_time:.3g}" out += self.demography._events_html(epoch.events, title) out += "</div></details>" else: assert len(epoch.events) == 0 title = epoch._title_text() out += f'<details open="true"><summary>{title}</summary>' # Indent the content div slightly out += '<div style="margin-left:20px">' out += self._populations_html(epoch) out += "</div>" out += "</details>" return f"<div>{out}</div>"
[docs] def print_history(self, output=sys.stdout): """ Prints a summary of the history of the populations. Deprecated since 1.0: use ``print(debugger)`` instead. """ print(self, file=output, end="")
def __str__(self): def indent(table, header_char="╟", depth=4): lines = table.splitlines() s = header_char + (" " * depth) + lines[0] + "\n" for line in lines[1:]: s += "║" + (" " * depth) + line + "\n" return s def box(title): N = len(title) + 2 top = "╠" + ("═" * N) + "╗" bottom = "╠" + ("═" * N) + "╝" return f"{top}\n{title}\n{bottom}\n" out = "DemographyDebugger\n" for epoch in self.epochs: if epoch.index > 0: assert len(epoch.events) > 0 title = f"Events @ generation {epoch.start_time:.3g}" out += indent(self.demography._events_text(epoch.events, title)) out += box(epoch._title_text()) out += indent(self._populations_text(epoch)) return out @property def population_size_history(self): """ Returns a (num_pops, num_epochs) numpy array giving the starting population size for each population in each epoch. """ pop_size = np.zeros((self.num_populations, self.num_epochs)) for j, epoch in enumerate(self.epochs): for k, pop in enumerate(epoch.populations): pop_size[k, j] = pop.start_size return pop_size @property def epoch_start_time(self): """ The array of epoch start_times defined by the demographic model. """ return np.array([x.start_time for x in self.epochs]) @property def num_epochs(self): """ Returns the number of epochs defined by the demographic model. """ return len(self.epochs)
[docs] def population_size_trajectory(self, steps): """ Return an array of per-population population sizes, as defined by the demographic model. These are the `initial_size` parameters of the model, modified by any population growth rates. The sizes are computed at the time points given by `steps`. :param list steps: List of times ago at which the population size will be computed. :return: Returns a numpy array of population sizes, with one column per population, whose [i,j]th entry is the size of population j at time steps[i] ago. """ num_pops = self.num_populations N_t = np.zeros([len(steps), num_pops]) for j, t in enumerate(steps): N, _ = self._pop_size_and_migration_at_t(t) N_t[j] = N return N_t
[docs] def lineage_probabilities(self, steps, sample_time=0): """ Returns an array such that P[j, a, b] is the probability that a lineage that started in population a at time sample_time is in population b at time steps[j] ago. This function reports sampling probabilities _before_ mass migration events (or other events that move lineages) at a step time, if a mass migration event occurs at one of those times. Migrations will then effect the next time step. :param list steps: A list of times to compute probabilities. :param sample_time: The time of sampling of the lineage. For any times in steps that are more recent than sample_time, the probability of finding the lineage in any population is zero. :return: An array of dimension len(steps) by num pops by num_pops. """ num_pops = self.num_populations # P[i, j] will be the probability that a lineage that started in i is now in j P = np.eye(num_pops) # epochs are defined by mass migration events or changes to population sizes # or migration rates, so we add the epoch interval times to the steps that we # need to account for epoch_breaks = [t for t in self.epoch_start_time if t not in steps] all_steps = np.concatenate([steps, epoch_breaks]) sampling = [] if sample_time not in all_steps: sampling.append(sample_time) all_steps = np.concatenate((all_steps, sampling)) ix = np.argsort(all_steps) all_steps = all_steps[ix] # keep track of the steps to report in P_out keep_steps = np.concatenate( [ np.repeat(True, len(steps)), np.repeat(False, len(epoch_breaks)), np.repeat(False, len(sampling)), ] )[ix] assert len(np.unique(all_steps)) == len(all_steps) assert np.all(steps == all_steps[keep_steps]) P_out = np.zeros((len(all_steps), num_pops, num_pops)) first_step = 0 while all_steps[first_step] < sample_time: first_step += 1 P_out[first_step] = P # get ordered mass migration events mass_migration_objects = [] mass_migration_times = [] for demo in self.demography.events: if isinstance(demo, LineageMovementEvent): # Convert higher-level lineage movement events like Admixtures # and PopulationSplits into LineageMovement instances. These are # equivalent to MassMigrations for lm in demo._as_lineage_movements(): mass_migration_objects.append(lm) mass_migration_times.append(demo.time) for jj in range(first_step, len(all_steps) - 1): t_j = all_steps[jj] # apply any mass migration events to P # so if we sample at this time, we do not account for the instantaneous # mass migration events that occur at the same time. that will show up # at the next step if t_j > sample_time: for mass_mig_t, mass_mig_e in zip( mass_migration_times, mass_migration_objects ): if mass_mig_t == t_j: S = np.eye(num_pops, num_pops) S[mass_mig_e.source, mass_mig_e.dest] = mass_mig_e.proportion S[mass_mig_e.source, mass_mig_e.source] = ( 1 - mass_mig_e.proportion ) P = np.matmul(P, S) # get continuous migration matrix over next interval _, M = self._pop_size_and_migration_at_t(t_j) dt = all_steps[jj + 1] - all_steps[jj] dM = np.diag([sum(s) for s in M]) # advance to next interval time (dt) taking into account continuous mig P = P.dot(_matrix_exponential(dt * (M - dM))) P_out[jj + 1] = P return P_out[keep_steps]
[docs] def possible_lineage_locations(self, samples=None): """ Given the sampling configuration, this function determines when lineages are possibly found within each population over epochs defined by demographic events and sampling times. If no sampling configuration is given, we assume we sample lineages from every population at time zero. The epoch intervals returned are those in which there are *distinct* configurations of possible lineage locations, and so the number of returned epochs may be less than the total number of epochs defined by the demography and will depend on the input sample configuration. The samples are specified by either a list of population identifiers ( integer IDs or string names) or by a list of :class:`.SampleSet` objects, allowing sampling times to be specified explicitly. If the ``time`` field of the :class:`.SampleSet` is not specified (or population IDs are used) samples are taken at the population's `default_sampling_time`. Only :class:`.SampleSet` objects with ``num_samples > 0`` are counted as contributing samples to a particular population. To support legacy code, :class:`.Sample` objects from the 0.x API can also provided, although its use is discouraged in new code. :param list samples: The populations that we sample from. Can be either a list of population identifiers, :class:`.SampleSet` or :class:`.Sample` objects. :return: Returns a dictionary with epoch intervals as keys whose values are a list with length equal to the number of populations with True and False indicating which populations could possibly contain lineages over that epoch. The epoch intervals are given by tuples: (epoch start, epoch end). The first epoch necessarily starts at time 0, and the final epoch has end time of infinity. """ if samples is None: samples = [ pop.id for pop in self.demography.populations if pop.default_sampling_time == 0 ] # get configuration of sampling times from samples ({time:[pops_sampled_from]}) sampling_times = collections.defaultdict(list) for sample in samples: if isinstance(sample, (ancestry.Sample, ancestry.SampleSet)): pop_id = self.demography[sample.population].id sample_time = ( self.demography[pop_id].default_sampling_time if sample.time is None else sample.time ) if isinstance(sample, ancestry.SampleSet) and sample.num_samples <= 0: # If someone specifies 0 samples it should not be counted continue else: # Assume this is a population identifier. pop = self.demography[sample] pop_id = pop.id sample_time = pop.default_sampling_time sampling_times[sample_time].append(pop_id) for t in sampling_times.keys(): sampling_times[t] = list(set(sampling_times[t])) all_steps = sorted( list(set([t for t in self.epoch_start_time] + list(sampling_times.keys()))) ) epochs = [(x, y) for x, y in zip(all_steps[:-1], all_steps[1:])] epochs.append((all_steps[-1], np.inf)) # need to go a bit beyond last step and into the final epoch that extends to inf all_steps.append(all_steps[-1] + 1) indicators = {e: np.zeros(self.num_populations, dtype=bool) for e in epochs} for sample_time, pop_ids in sampling_times.items(): P_out = self.lineage_probabilities(all_steps, sample_time=sample_time) for epoch, P in zip(epochs, P_out[1:]): if epoch[1] <= sample_time: # samples shouldn't affect the epoch previous to the sampling time continue for pop_id in pop_ids: indicators[epoch][P[pop_id] > 0] = True # join epochs if adjacent epochs have same set of possible live populations combined_indicators = {} skip = 0 for ii, (epoch, inds) in enumerate(indicators.items()): if skip > 0: skip -= 1 continue this_epoch = epoch while ii + skip + 1 < len(epochs) and np.all( indicators[epochs[ii + 1 + skip]] == inds ): this_epoch = (this_epoch[0], epochs[ii + 1 + skip][1]) skip += 1 combined_indicators[this_epoch] = inds return combined_indicators
[docs] def mean_coalescence_time( self, lineages, min_pop_size=1, steps=None, rtol=0.005, max_iter=12 ): """ Compute the mean time until coalescence between pairs of the specified sample ``lineages``. Sample lineages are specified as a mapping from populations to the number of **monoploid** sample genomes present in that population at time zero. See the :ref:`sec_demography_numerical_coalescence` section for usage examples and more details. .. important:: This function assumes a diploid model when computing coalescence rates (see the :ref:`sec_ancestry_ploidy_coalescent_time_scales` section for more information). The calculation is performed by using :meth:`~.DemographyDebugger.coalescence_rate_trajectory` to compute the probability that the lineages have not yet coalesced by time `t`, and using these to approximate :math:`E[T] = \\int_t^\\infty P(T > t) dt`, where :math:`T` is the coalescence time. See :meth:`~.DemographyDebugger.coalescence_rate_trajectory` for more details. To compute this, an adequate time discretisation must be arrived at by iteratively extending or refining the current discretisation. Debugging information about numerical convergence of this procedure is logged using the Python :mod:`logging` infrastructure. The `daiquiri <https://pypi.org/project/daiquiri/>`_ module is a convenient way to set up logging, and we can use it to make these messages appear on stderr like this:: import daiquiri daiquiri.setup(level="DEBUG") debugger.mean_coalescence_time(1) Briefly, this outputs iteration number, mean coalescence time, maximum difference in probability of not having coalesced yet, difference to last coalescence time, probability of not having coalesced by the final time point, and whether the last iteration was an extension or refinement. :param dict lineages: A mapping of populations (either integer IDs or string names: see the :ref:`sec_demography_populations_identifiers` section for more details) to the number of monoploid sample lineages in that population. :param int min_pop_size: See :meth:`~.DemographyDebugger.coalescence_rate_trajectory`. :param list steps: The time discretisation to start out with (by default, picks something based on epoch times). :param float rtol: The relative tolerance to determine mean coalescence time to (used to decide when to stop subdividing the steps). :param int max_iter: The maximum number of times to subdivide the steps. :return: The mean coalescence time (a number). :rtype: float """ def mean_time(steps, P): # Mean is int_0^infty P(T > t) dt, which we estimate by discrete integration # assuming that f(t) = P(T > t) is piecewise exponential: # if f(u) = a exp(bu) then b = log(f(t)/f(s)) / (t-s) for each s < t, so # \int_s^t f(u) du = (a/b) \int_s^t exp(bu) b du = (a/b)(exp(bt) - exp(bs)) # = (t - s) * (f(t) - f(s)) / log(f(t) / f(s)) # unless b = 0, of course. assert steps[0] == 0 dt = np.diff(steps) dP = np.diff(P) with np.errstate(divide="ignore", invalid="ignore"): dlogP = np.diff(np.log(P)) nz = np.logical_and(dP < 0, P[1:] * P[:-1] > 0) const = dP == 0 return np.sum(dt[const] * (P[:-1])[const]) + np.sum( dt[nz] * dP[nz] / dlogP[nz] ) if steps is None: last_N = max(self.population_size_history[:, self.num_epochs - 1]) last_epoch = self.epoch_start_time[-1] steps = sorted( list( set(np.linspace(0, last_epoch + 12 * last_N, 101)).union( set(self.epoch_start_time) ) ) ) p_diff = m_diff = np.inf last_P = np.inf step_type = "none" n = 0 logger.debug( "iter mean P_diff mean_diff last_P adjust_type " "num_steps last_step" ) # The factors of 20 here are probably not optimal: clearly, we need to # compute P accurately, but there's no good reason for this stopping rule. # If populations have piecewise constant size then we shouldn't need this: # setting steps equal to the epoch boundaries should suffice; while if # there is very fast exponential change in some epochs caution is needed. while n < max_iter and ( last_P > rtol or p_diff > rtol / 20 or m_diff > rtol / 20 ): last_steps = steps if n == 0: _, P1 = self.coalescence_rate_trajectory( steps=last_steps, lineages=lineages, min_pop_size=min_pop_size, double_step_validation=False, ) m1 = mean_time(last_steps, P1) if last_P > rtol: step_type = "extend" steps = np.concatenate( [steps, np.linspace(steps[-1], steps[-1] * 1.2, 20)[1:]] ) else: step_type = "refine" inter = steps[:-1] + np.diff(steps) / 2 steps = np.concatenate([steps, inter]) steps.sort() _, P2 = self.coalescence_rate_trajectory( steps=steps, lineages=lineages, min_pop_size=min_pop_size, double_step_validation=False, ) m2 = mean_time(steps, P2) keep_steps = np.in1d(steps, last_steps) p_diff = max(np.abs(P1 - P2[keep_steps])) m_diff = np.abs(m1 - m2) / m2 last_P = P2[-1] n += 1 # Use the old-style string formatting as this is the logging default logger.debug( "%d %g %g %g %g %s %d %d", n, m2, p_diff, m_diff, last_P, step_type, len(steps), max(steps), ) P1 = P2 m1 = m2 if n == max_iter: raise ValueError( "Did not converge on an adequate discretisation: " "Increase max_iter or rtol. Consult the log for " "debugging information" ) return m2
[docs] def coalescence_rate_trajectory( self, steps, lineages, min_pop_size=1, double_step_validation=True ): """ Calculate the mean coalescence rates and proportions of uncoalesced lineages between pairs of the specified sample lineages, at each of the times ago listed by steps, in this demographic model. Sample lineages are specified as a mapping from populations to the number of **monoploid** sample genomes present in that population at time zero. See the :ref:`sec_demography_numerical_trajectories` section for usage examples and more details. The coalescence rate at time t in the past is the average rate of coalescence of as-yet-uncoalesced lineages, computed as follows: let :math:`p(t)` be the probability that the lineages of a randomly chosen pair of samples has not yet coalesced by time :math:`t`, let :math:`p(z,t)` be the probability that the lineages of a randomly chosen pair of samples has not yet coalesced by time :math:`t` *and* are both in population :math:`z`, and let :math:`N(z,t)` be the diploid effective population size of population :math:`z` at time :math:`t`. Then the mean coalescence rate at time :math:`t` is :math:`r(t) = (\\sum_z p(z,t) / (2 * N(z,t))) / p(t)`. The computation is done by approximating population size trajectories with piecewise constant trajectories between each of the steps. For this to be accurate, the distance between the steps must be small enough so that (a) short epochs (e.g., bottlenecks) are not missed, and (b) populations do not change in size too much over that time, if they are growing or shrinking. This function optionally provides a simple check of this approximation by recomputing the coalescence rates on a grid of steps twice as fine and throwing a warning if the resulting values do not match to a relative tolerance of 0.001. :param list steps: The times ago at which coalescence rates will be computed. :param dict lineages: A mapping of populations (either integer IDs or string names: see the :ref:`sec_demography_populations_identifiers` section for more details) to the number of monoploid sample lineages in that population. :param int min_pop_size: The smallest allowed population size during computation of coalescent rates (i.e., coalescence rates are actually 1 / (2 * max(min_pop_size, N(z,t))). Spurious very small population sizes can occur in models where populations grow exponentially but are unused before some time in the past, and lead to floating point error. This should be set to a value smaller than the smallest desired population size in the model. :param bool double_step_validation: Whether to perform the check that step sizes are sufficiently small, as described above. This is highly recommended, and will take at most four times the computation. :return: A tuple of arrays whose jth elements, respectively, are the coalescence rate at the jth time point (denoted r(t[j]) above), and the probability that a randomly chosen pair of lineages has not yet coalesced (denoted p(t[j]) above). :rtype: (numpy.ndarray, numpy.ndarray) """ num_samples = np.zeros(self.num_populations, dtype=int) for population, num_genomes in lineages.items(): pop_id = self.demography[population].id num_samples[pop_id] += num_genomes steps = np.array(steps) if not np.all(np.diff(steps) > 0): raise ValueError("`steps` must be a sequence of increasing times.") if np.any(steps < 0): raise ValueError("`steps` must be non-negative") r, p_t = self._calculate_coalescence_rate_trajectory( steps=steps, num_samples=num_samples, min_pop_size=min_pop_size ) if double_step_validation: inter = steps[:-1] + np.diff(steps) / 2 double_steps = np.concatenate([steps, inter]) double_steps.sort() rd, p_td = self._calculate_coalescence_rate_trajectory( steps=double_steps, num_samples=num_samples, min_pop_size=min_pop_size ) assert np.all(steps == double_steps[::2]) r_prediction_close = np.allclose(r, rd[::2], rtol=1e-3, equal_nan=True) p_prediction_close = np.allclose(p_t, p_td[::2], rtol=1e-3, equal_nan=True) if not (r_prediction_close and p_prediction_close): warnings.warn( "Doubling the number of steps has resulted in different " " predictions, please re-run with smaller step sizes to ensure " " numerical accuracy.", stacklevel=2, ) return r, p_t
def _calculate_coalescence_rate_trajectory(self, steps, num_samples, min_pop_size): num_pops = self.num_populations P = np.zeros([num_pops**2, num_pops**2]) IA = np.array(range(num_pops**2)).reshape([num_pops, num_pops]) Identity = np.eye(num_pops) for x in range(num_pops): for y in range(num_pops): P[IA[x, y], IA[x, y]] = num_samples[x] * (num_samples[y] - (x == y)) P = P / np.sum(P) # add epoch breaks if not there already but remember which steps they are epoch_breaks = list( set([0.0] + [t for t in self.epoch_start_time if t not in steps]) ) steps_b = np.concatenate([steps, epoch_breaks]) ix = np.argsort(steps_b) steps_b = steps_b[ix] keep_steps = np.concatenate( [np.repeat(True, len(steps)), np.repeat(False, len(epoch_breaks))] )[ix] assert np.all(steps == steps_b[keep_steps]) mass_migration_objects = [] mass_migration_times = [] for demo in self.demography.events: if isinstance(demo, LineageMovementEvent): # Convert higher-level lineage movement events like Admixtures # and PopulationSplits into LineageMovement instances. These are # equivalent to MassMigrations for lm in demo._as_lineage_movements(): mass_migration_objects.append(lm) mass_migration_times.append(demo.time) num_steps = len(steps_b) # recall that steps_b[0] = 0.0 r = np.zeros(num_steps) p_t = np.zeros(num_steps) for j in range(num_steps - 1): time = steps_b[j] dt = steps_b[j + 1] - steps_b[j] N, M = self._pop_size_and_migration_at_t(time) C = np.zeros([num_pops**2, num_pops**2]) for idx in range(num_pops): C[IA[idx, idx], IA[idx, idx]] = 1 / (2 * max(min_pop_size, N[idx])) dM = np.diag([sum(s) for s in M]) for mmt, mmo in zip(mass_migration_times, mass_migration_objects): if mmt == time: a = mmo.source b = mmo.dest p = mmo.proportion S = np.eye(num_pops**2, num_pops**2) for x in range(num_pops): if x == a: S[IA[a, a], IA[a, b]] = S[IA[a, a], IA[b, a]] = p * (1 - p) S[IA[a, a], IA[b, b]] = p**2 S[IA[a, a], IA[a, a]] = (1 - p) ** 2 else: S[IA[x, a], IA[x, b]] = S[IA[a, x], IA[b, x]] = p S[IA[x, a], IA[x, a]] = S[IA[a, x], IA[a, x]] = 1 - p P = np.matmul(P, S) p_notcoal = np.sum(P) p_t[j] = p_notcoal if p_notcoal > 0: r[j] = np.sum(np.matmul(P, C)) / np.sum(P) else: r[j] = np.nan G = (np.kron(M - dM, Identity) + np.kron(Identity, M - dM)) - C P = np.matmul(P, _matrix_exponential(dt * G)) p_notcoal = np.sum(P) p_t[num_steps - 1] = p_notcoal if p_notcoal > 0: r[num_steps - 1] = np.sum(np.matmul(P, C)) / p_notcoal else: r[num_steps - 1] = np.nan return r[keep_steps], p_t[keep_steps] def _pop_size_and_migration_at_t(self, t): """ Returns a tuple (N, M) of population sizes (N) and migration rates (M) at time t ago. Note: this isn't part of the external API as it is be better to provide separate methods to access the population size and migration rates, and needing both together is specialised for internal calculations. :param float t: The time ago. :return: A tuple of arrays, of the same form as the population sizes and migration rate arrays of the demographic model. """ j = 0 while self.epochs[j].end_time <= t: j += 1 N = self.population_size_history[:, j] for i, pop in enumerate(self.epochs[j].populations): s = t - self.epochs[j].start_time g = pop.growth_rate N[i] *= np.exp(-1 * g * s) return N, self.epochs[j].migration_matrix