import msprime
import tskit
import warnings
import numpy as np
from .slim_tree_sequence import *
from .slim_metadata import *
from .provenance import *
from .util import *
[docs]def recapitate(ts,
ancestral_Ne=None,
**kwargs
):
'''
Returns a "recapitated" tree sequence, by using msprime to run a
coalescent simulation from the "top" of this tree sequence, i.e.,
allowing any uncoalesced lineages to coalesce.
To allow recapitation to be done correctly, the nodes of the
first generation of the SLiM simulation from whom all samples inherit
are still present in the tree sequence, but are not marked as samples.
If you simplify the tree sequence before recapitating you must ensure
these are not removed, which you do by passing the argument
``keep_input_roots=True`` to :meth:`simplify <tskit.TreeSequence.simplify>`.
If you specify an ``ancestral_Ne``, then the recapitated portion of the
tree sequence will be simulated in a single population with this
(diploid) size. In other words, all lineages are moved to a single
population of this size (named "ancestral" if this name is not already
taken), and coalescence is allowed to happen.
You may control the ancestral demography by passing in a ``demography``
argument: see :func:`msprime.sim_ancestry`.
In general, all defaults are whatever the defaults of
:func:`msprime.sim_ancestry` are; this includes recombination rate, so
that if neither ``recombination_rate`` or a ``recombination_map`` are
provided, there will be *no* recombination.
:param tskit.TreeSequence ts: The tree sequence to transform.
:param float ancestral_Ne: If specified, then will simulate from a single
ancestral population of this size. It is an error to specify this
as well as ``demography``.
:param dict kwargs: Any other arguments to :func:`msprime.sim_ancestry`.
'''
is_current_version(ts, _warn=True)
if ancestral_Ne is not None:
if "demography" in kwargs:
raise ValueError("You cannot specify both `demography` and `ancestral_Ne`.")
recap_time = ts.metadata['SLiM']['tick']
# In various circumstances depending on the stage in which the simulation was
# started and when the tree sequence was written out, the time of the roots
# might be one or even two less than the "tick" value. We have access
# to the stage the tree sequence was written out, but not the time it
# was *started*. So, we'll just check if we need to subtract one or two.
# Consistency checking this requires looping over all the trees, unfortunately,
# but it avoids some common errors.
root_times = list(set([ts.node(n).time for t in ts.trees() for n in t.roots]))
for adj in (1, 2):
if np.abs(recap_time - root_times[0] - adj) < 1e-8:
recap_time -= adj
if len(root_times) > 1 or not np.abs(root_times[0] - recap_time) < 1e-8:
message = (
"Not all roots of the provided tree sequence are at the time expected "
"by recapitate(). This could happen if you've simplified in "
"python before recapitating (fix: don't simplify first). "
"If could also happen in other situations, e.g., "
"you added new individuals without parents in SLiM "
"during the course of the simulation with sim.addSubPop(), "
"in which case you will probably need to recapitate with "
"msprime.sim_ancestry(initial_state=ts, ...). "
f"(Expected root time: {recap_time}; "
f"Observed root times: {root_times})"
)
raise ValueError(message)
demography = msprime.Demography.from_tree_sequence(ts)
# must set pop sizes to >0 even though we merge immediately
for pop in demography.populations:
pop.initial_size=1.0
ancestral_name = "ancestral"
derived_names = [pop.name for pop in demography.populations]
while ancestral_name in derived_names:
ancestral_name = (ancestral_name + "_ancestral")
demography.add_population(
name=ancestral_name,
description="ancestral population simulated by msprime",
initial_size=ancestral_Ne,
)
# the split has to come slightly longer ago than slim's tick
# since that's when all the linages are at, and otherwise the event
# won't apply to them
demography.add_population_split(
np.nextafter( recap_time, 2 * recap_time),
derived=derived_names,
ancestral=ancestral_name,
)
kwargs["demography"] = demography
recap = msprime.sim_ancestry(
initial_state = ts,
**kwargs)
return recap
[docs]def convert_alleles(ts):
"""
Returns a modified tree sequence in which alleles have been replaced by
their corresponding nucleotides. For sites, SLiM-produced tree sequences
have "" (the empty string) for the ancestral state at each site; this method
will replace this with the corresponding nucleotide from the reference sequence.
For mutations, SLiM records the 'derived state' as a SLiM mutation ID; this
method will this with the nucleotide from the mutation's metadata.
This operation is not reversible: since SLiM mutation IDs are lost, the tree
sequence will not be able to be read back into SLiM.
The main purpose of this method is for output: for instance, this code will produce
a VCF file with nucleotide alleles:
.. code-block:: python
nts = pyslim.convert_alleles(ts)
with open('nucs.vcf', 'w') as f:
nts.write_vcf(f)
This method will produce an error if the tree sequence does not have a
valid reference sequence or if any mutations do not have nucleotides: to first
generate these, see :func:`.generate_nucleotides`.
:param tskit.TreeSequence ts: The tree sequence to transform.
"""
tables = ts.dump_tables()
has_refseq = (
ts.has_reference_sequence()
and len(ts.reference_sequence.data) > 0
)
if not has_refseq:
raise ValueError("Tree sequence must have a valid reference sequence.")
# unfortunately, nucleotide mutations may be stacked (e.g., substitutions
# will appear this way) and they don't appear in any particular order;
# so we must guess which is the most recent, by choosing the one that
# has the largest SLiM time, doesn't appear in the parent list, or has
# the lagest SLiM ID.
nuc_inds = tables.mutations.metadata_vector(['mutation_list', 0, 'nucleotide'], dtype='int')
num_stacked = np.array([len(m.metadata['mutation_list']) for m in ts.mutations()])
for k in np.where(num_stacked > 1)[0]:
mut = ts.mutation(k)
if mut.parent == tskit.NULL:
pids = []
else:
pids = ts.mutation(mut.parent).derived_state.split(",")
x = [
(
md['slim_time'],
i not in pids,
int(i),
j
) for j, (i, md) in
enumerate(
zip(mut.derived_state.split(","), mut.metadata['mutation_list'])
)
]
x.sort()
j = x[-1][3]
nuc_inds[k] = mut.metadata['mutation_list'][j]['nucleotide']
if np.any(nuc_inds == -1):
raise ValueError("All mutations must be nucleotide mutations.")
da = np.array(NUCLEOTIDES)[nuc_inds]
tables.mutations.packset_derived_state(da)
k = tables.sites.position.astype('int')
aa = np.frombuffer(ts.reference_sequence.data.encode('utf-8'), dtype='S1')[k]
tables.sites.packset_ancestral_state(aa.tobytes().decode('utf-8'))
return tables.tree_sequence()
[docs]def generate_nucleotides(ts, reference_sequence=None, keep=True, seed=None):
"""
Returns a modified tree sequence in which mutations have been randomly assigned nucleotides
and (optionally) a reference sequence has been randomly generated.
If ``reference_sequence`` is a string of nucleotides (A, C, G, and T) of
length equal to the sequence length, this is used for the reference
sequence. If no reference sequence is given, the reference_sequence property
of ts is used if present; if not then a sequence of independent and
uniformly random nucleotides is generated.
SLiM stores the nucleotide as an integer in the mutation metadata, with -1 meaning "not
a nucleotide mutation". This method assigns nucleotides by stepping through
each mutation and picking a random nucleotide uniformly out of the three
possible nucleotides that differ from the parental state (i.e., the derived
state of the parental mutation, or the ancestral state if the mutation has
no parent). If ``keep=True`` (the default), the mutations that already have a
nucleotide (i.e., an integer 0-3 in metadata) will not be modified.
Technical note: in the case of stacked mutations, the SLiM mutation that
determines the nucleotide state of the (tskit) mutation is the one with the largest
slim_time attribute. This method tries to assign nucleotides so that each mutation
differs from the previous state, but this is not always possible in certain
unlikely cases.
:param tskit.TreeSequence ts: The tree sequence to transform.
:param bool reference_sequence: A reference sequence, or None to use an existing reference,
or to randomly generate one.
:param bool keep: Whether to leave existing nucleotides in mutations that already have one.
:param int seed: The random seed for generating new alleles.
"""
rng = np.random.default_rng(seed=seed)
if reference_sequence is None:
if not ts.has_reference_sequence():
reference_sequence = rng.choice(
np.array([65, 67, 71, 84], dtype=np.int8),
int(ts.sequence_length),
replace=True,
).tobytes().decode('ascii')
else:
if len(reference_sequence) != ts.sequence_length:
raise ValueError("Reference sequence must have length equal to sequence_length.")
if len([x for x in reference_sequence if x not in NUCLEOTIDES]) > 0:
raise ValueError("Reference sequence must be a string of A, C, G, and T only.")
tables = ts.dump_tables()
if reference_sequence is not None:
tables.reference_sequence.data = reference_sequence
tables.mutations.clear()
sets = [[k for k in range(4) if k != i] for i in range(4)]
states = np.full((ts.num_mutations,), -1)
k = tables.sites.position.astype('int')
aa_list = np.searchsorted(
NUCLEOTIDES,
np.frombuffer(tables.reference_sequence.data.encode('utf-8'), dtype='S1')[k],
)
for site in ts.sites():
aa = aa_list[site.id]
muts = {}
for mut in site.mutations:
if mut.parent == tskit.NULL:
pa = aa
pds = []
else:
pa = states[mut.parent]
pds = ts.mutation(mut.parent).derived_state.split(",")
this_da = pa
ml = mut.metadata
max_time = -np.inf
for i, md in zip(mut.derived_state.split(","), ml['mutation_list']):
da = md['nucleotide']
if da == -1 or not keep:
if i in muts:
da = muts[i]
else:
da = sets[pa][rng.integers(3)]
md['nucleotide'] = da
muts[i] = da
# the official nucleotide state is from the SLiM mutation with
# the largest slim_time attribute that was not present in the parent
# mutation
if md["slim_time"] >= max_time and i not in pds:
this_da = da
max_time = md["slim_time"]
states[mut.id] = this_da
tables.mutations.append(mut.replace(metadata=ml))
return tables.tree_sequence()
[docs]def individual_ages(ts):
"""
Returns the ages of all individuals in the tree sequence, extracted
from metadata. The result is a array of length equal to the number of
individuals, with k-th entry equal to ``ts.individual(k).metadata["age"]``.
:return: An array of ages of individuals.
"""
if ts.metadata['SLiM']['model_type'] != "WF":
ages = ts.tables.individuals.metadata_vector("age")
else:
ages = np.zeros(ts.num_individuals, dtype='int')
return ages
[docs]def individuals_alive_at(ts, time, stage='late', remembered_stage=None,
population=None, samples_only=False):
"""
Returns an array giving the IDs of all individuals that are known to be
alive at the given time ago. This is determined using their birth time
ago (given by their `time` attribute) and, for nonWF models,
their `age` attribute (which is equal to their age at the last time
they were Remembered). See also {func}`.individual_ages_at`.
In WF models, birth occurs after "early()", so that individuals are only
alive during "late()" for the time step when they have age zero,
while in nonWF models, birth occurs before "early()", so they are alive
for both stages.
In both WF and nonWF models, mortality occurs between
"early()" and "late()", so that individuals are last alive during the
"early()" stage of the time step of their final age, and if individuals
are alive during "late()" they will also be alive during "first()" and
"early()" of the next time step. This means it is important to know during
which stage individuals were Remembered - for instance, if the call to
sim.treeSeqRememberIndividuals() was made during "early()" of a given time step,
then those individuals might not have survived until "late()" of that
time step. Since SLiM does not record the stage at which individuals
were Remembered, you can specify this by setting ``remembered_stages``:
it should be the stage during which *all* calls to
``sim.treeSeqRememberIndividuals()``, as well as to ``sim.treeSeqOutput()``,
were made.
Note also that in nonWF models, birth occurs between "first()" and
"early()", so the possible parents in a given time step are those that are
alive in "early()" and have age greater than zero, or, equivalently, are
alive in "late()" during the previous time step.
In WF models, birth occurs after "early()", so possible parents in a
given time step are those that are alive during "early()" of that time
step or are alive during "late()" of the previous time step.
Since individuals may be created not during the usual 'birth' stage
by `addSubPop( )`, and the stage at which they are created is not stored,
results may be unreliable for the first-generation individuals.
:param tskit.TreeSequence ts: A tree sequence.
:param float time: The number of ticks (i.e., time steps) ago.
:param str stage: The stage in the SLiM life cycle that we are inquiring
about (either "first", "early" or "late"; defaults to "late").
:param str remembered_stage: The stage in the SLiM life cycle
during which individuals were Remembered (defaults to the stage the
tree sequence was recorded at, stored in metadata).
:param int population: If given, return only individuals in the
population(s) with these population ID(s).
:param bool samples_only: Whether to return only individuals who have at
least one node marked as samples.
"""
is_current_version(ts, _warn=True)
if stage not in ("late", "early", "first"):
raise ValueError(f"Unknown stage '{stage}': "
"should be either 'first', 'early' or 'late'.")
if remembered_stage is None:
remembered_stage = ts.metadata['SLiM']['stage']
if remembered_stage not in ("late", "early", "first"):
raise ValueError(f"Unknown remembered_stage '{remembered_stage}': "
"should be either 'first', 'early' or 'late'.")
if remembered_stage != ts.metadata['SLiM']['stage']:
warnings.warn(f"Provided remembered_stage '{remembered_stage}' does not"
" match the stage at which the tree sequence was saved"
f" ('{ts.metadata['SLiM']['stage']}'). This is not necessarily"
" an error, but mismatched stages will lead to inconsistencies:"
" make sure you know what you're doing.")
# An individual's tskit time is the tskit counter at the time of their birth.
# The tskit counter clicks once at the start of each reproduction bout.
# In a WF model, individuals are alive for all stages with the same value
# of the tskit counter. In a nonWF model, an individual of age `a` will be
# alive for `a + 1` 'early' events, and `a` 'late' and 'first' events,
# starting with the one having the same value of the tskit counter.
# Also note that if remembered_stage is 'late', then they are recorded
# before their age counter clicks, so they will actually have age one greater.
# Since `time` is in "number of time steps ago",
# we are inquiring about SLiM tick `n - time`, where `n` is
# the value of `ts.metadata["SLiM"]["tick"]`. To convert this along with
# our stage information into a tskit time `t`,
# let x = 1 if the stage is 'first' or (is 'early' and WF)
# and y = 1 if remembered stage is 'late' or (is 'early' and nonWF);
# then t = time + x + y - 1 .
is_wf = (ts.metadata['SLiM']['model_type'] == "WF")
x = (stage == "first" or (stage == "early" and is_wf))
y = (remembered_stage == "late" or (remembered_stage == "early" and not is_wf))
t = time + x + y - 1
birth_times = ts.individuals_time
ages = individual_ages(ts)
if is_wf:
alive_bool = (birth_times == t)
else:
age_offset = (stage == "early") + (remembered_stage == "late")
alive_bool = np.logical_and(
birth_times >= t,
birth_times - ages < t + age_offset
)
if population is not None:
alive_bool &= np.isin(
ts.individuals_population,
population
)
if samples_only:
nodes = ts.tables.nodes
alive_bool &= (
0 < np.bincount(1 + nodes.individual,
nodes.flags & tskit.NODE_IS_SAMPLE,
minlength=1 + ts.num_individuals)[1:]
)
return np.where(alive_bool)[0]
[docs]def individual_ages_at(ts, time, stage="late", remembered_stage="late"):
"""
Returns the `ages` of each individual at the corresponding time ago,
which will be ``nan`` if the individual is either not born yet or dead.
This is computed as the time ago the individual was born (found by the
`time` associated with the the individual's nodes) minus the `time`
argument; while "death" is inferred from the individual's ``age``,
recorded in metadata. These values are the same as what would be shown
in SLiM during the corresponding time step and stage.
Since age increments at the end of each time step,
the age is the number of time steps ends the individual has lived
through, so if they were born in time step `time`, then their age
will be zero.
In a WF model, this method does not provide any more information than
does {func}`.individuals_alive_at`, but for consistency, non-nan ages
will be 0 in "late" and 1 in "first" and "early".
See {func}`.individuals_alive_at` for further discussion.
:param tskit.TreeSequence ts: A tree sequence.
:param float time: The reference time ago.
:param str stage: The stage in the SLiM life cycle used to determine who
is alive (either "early" or "late"; defaults to "late").
:param str remembered_stage: The stage in the SLiM life cycle during which
individuals were Remembered.
"""
ages = np.repeat(np.nan, ts.num_individuals)
alive = individuals_alive_at(
ts,
time,
stage=stage,
remembered_stage=remembered_stage
)
# to convert individuals_time to number of ticks ago we subtract (y - 1), so
is_wf = (ts.metadata['SLiM']['model_type'] == "WF")
y = (remembered_stage == "late" or (remembered_stage == "early" and not is_wf))
t = time + y - 1
ages[alive] = ts.individuals_time[alive] - t
return ages
[docs]def slim_time(ts, time, stage="late"):
"""
Converts the given "tskit times" (i.e., in units of time before the end
of the simulation) to SLiM times (those recorded by SLiM, usually in units
of ticks since the start of the simulation). Although the latter are
always integers, these will not be if the provided times are not integers.
When the tree sequence is written out, SLiM records the current
current tick in the metadata:
``ts.metadata['SLiM']['tick']``. In most cases, the "SLiM time"
referred to by a time ago in the tree sequence (i.e., the value that would
be reported by community.tick within SLiM at the point in time thus
referenced) can be obtained by subtracting that time ago from
``ts.metadata['SLiM']['tick']``. However, in WF models, birth
happens between the “early()” and “late()” stages, so if the tree
sequence was written out using sim.treeSeqOutput() during “early()” in
a WF model, the tree sequence’s times measure time before the last set
of individuals are born, i.e., before SLiM time step
``ts.metadata['SLiM']['tick'] - 1``. The same thing applies to
the "first" stage for both WF and nonWF models.
In some situations (e.g., mutations added during early() in WF models)
this may not return what you expect. See :ref:`sec_metadata_converting_times`
for more discussion.
:param tskit.TreeSequence ts: A SLiM-compatible TreeSequence.
:param numpy.ndarray time: An array of times to be converted.
:param str stage: The stage of the SLiM life cycle that the SLiM time
should be computed for.
"""
is_current_version(ts, _warn=True)
is_wf = (ts.metadata['SLiM']['model_type'] == "WF")
remembered_stage = ts.metadata['SLiM']['stage']
x = (stage == "first" or (stage == "early" and is_wf))
y = (remembered_stage == "late" or (remembered_stage == "early" and not is_wf))
slim_time = ts.metadata['SLiM']['tick'] - time + x + y - 1
return slim_time
def _do_individual_parents_stuff(ts, return_parents=False):
# Helper for has_individual_parents and individual_parents,
# which share a lot of machinery.
tables = ts.tables
edges = tables.edges
nodes = tables.nodes
edge_parent_indiv = nodes.individual[edges.parent]
edge_child_indiv = nodes.individual[edges.child]
# nodes whose parent nodes are all in the same individual
unique_parent_nodes = unique_labels_by_group(
edges.child,
edge_parent_indiv,
minlength=nodes.num_rows)
unique_parent_edges = unique_parent_nodes[edges.child]
# edges describing relationships between individuals
indiv_edges = np.logical_and(
np.logical_and(edge_parent_indiv != tskit.NULL,
edge_child_indiv != tskit.NULL),
unique_parent_edges)
# individual edges where the parent was alive during "late"
# of the time step before the child is born
ind_times = ts.individuals_time
ind_ages = individual_ages(ts)
child_births = ind_times[edge_child_indiv[indiv_edges]]
parent_births = ind_times[edge_parent_indiv[indiv_edges]]
alive_edges = indiv_edges.copy()
if ts.metadata['SLiM']['model_type'] == "WF":
alive_edges[indiv_edges] = (child_births + 1 == parent_births)
else:
parent_deaths = parent_births - ind_ages[edge_parent_indiv[indiv_edges]]
alive_edges[indiv_edges] = (child_births + 1 >= parent_deaths)
edge_spans = edges.right - edges.left
parental_span = np.bincount(edge_child_indiv[alive_edges],
weights=edge_spans[alive_edges], minlength=ts.num_individuals)
# we could also check for edges without individual parents terminating
# in this individual, but this is unnecessary as the entire genome is
# accounted for
has_all_parents = (parental_span == 2 * ts.sequence_length)
if return_parents:
full_parent_edges = np.logical_and(
alive_edges,
has_all_parents[edge_child_indiv])
parents = np.unique(np.column_stack(
[edge_parent_indiv[full_parent_edges],
edge_child_indiv[full_parent_edges]]
), axis=0)
return parents
else:
return has_all_parents
[docs]def individual_parents(ts):
'''
Finds all parent-child relationships in the tree sequence (as far as we
can tell). The output will be a two-column array with row [i,j]
indicating that individual i is a parent of individual j. See
{func}`.has_individual_parents` for exactly which parents are returned.
See {func}`.individuals_alive_at` for further discussion about how
this is determined based on when the individuals were Remembered.
:param tskit.TreeSequence ts: A :class:`tskit.TreeSequence`.
:return: An array of individual IDs, with row [i, j] if individual i is
a parent of individual j.
'''
return _do_individual_parents_stuff(ts, return_parents=True)
[docs]def has_individual_parents(ts):
'''
Finds which individuals have both their parent individuals also present
in the tree sequence, as far as we can tell. To do this, we return a
boolean array with True for those individuals for which:
- all edges terminating in that individual's nodes are in individuals,
- each of the individual's nodes inherit from a single individual only,
- those parental individuals were alive when the individual was born,
- the parental individuals account for two whole genomes.
This returns a boolean array indicating for each individual whether all
these are true. Note in particular that individuals with only *one*
recorded parent are *not* counted as "having parents".
See {func}`.individuals_alive_at` for further discussion about how
this is determined based on when the individuals were Remembered.
:param tskit.TreeSequence ts: A :class:`tskit.TreeSequence`.
:return: A boolean array of length equal to ``targets``.
'''
return _do_individual_parents_stuff(ts, return_parents=False)
[docs]def annotate(ts, **kwargs):
'''
Takes a tree sequence (as produced by msprime, for instance), and adds in the
information necessary for SLiM to use it as an initial state, filling in
mostly default values. Returns a :class:`tskit.TreeSequence`.
:param tskit.TreeSequence ts: A :class:`tskit.TreeSequence`.
:param str model_type: SLiM model type: either "WF" or "nonWF".
:param int tick: What tick number in SLiM correponds to
``time=0`` in the tree sequence.
:param int cycle: What cycle number in SLiM correponds to
``time=0`` in the tree sequence (default: equal to ``tick``).
:param int stage: What stage in SLiM's cycle has the tree sequence
been written out in (defaults to "early"; must be "early" or "late").
:param str reference_sequence: A reference sequence of length
equal to ts.sequence_length.
:param bool annotate_mutations: Whether to replace mutation metadata
with defaults. (If False, the mutation table is unchanged.)
'''
tables = ts.dump_tables()
annotate_tables(tables, **kwargs)
return tables.tree_sequence()
[docs]def annotate_tables(tables, model_type, tick, cycle=None, stage="early", reference_sequence=None,
annotate_mutations=True):
'''
Does the work of :func:`annotate`, but modifies the tables in place: so,
takes tables as produced by ``msprime``, and makes them look like the
tables as output by SLiM. See :func:`annotate` for details.
'''
if stage not in ("early", "late"):
raise ValueError(f"stage must be 'early' or 'late' (provided {stage})")
if (type(tick) is not int) or (tick < 1):
raise ValueError("tick must be an integer and at least 1.")
# set_nodes must come before set_populations
if model_type == "WF":
default_ages = -1
elif model_type == "nonWF":
default_ages = 0
else:
raise ValueError("Model type must be 'WF' or 'nonWF'")
if cycle is None:
cycle = tick
if not np.allclose(tables.sites.position, np.floor(tables.sites.position)):
raise ValueError(
"Site positions in this tree sequence are not at integer values, "
"but must be for loading into SLiM: generate mutations with "
"sim_mutations(..., discrete_genome=True), not simulate()."
)
top_metadata = default_slim_metadata('tree_sequence')['SLiM']
top_metadata['model_type'] = model_type
top_metadata['tick'] = tick
top_metadata['cycle'] = cycle
top_metadata['stage'] = stage
set_tree_sequence_metadata(tables, **top_metadata)
set_metadata_schemas(tables)
_annotate_nodes_individuals(tables, age=default_ages)
_annotate_populations(tables)
if annotate_mutations:
_annotate_sites_mutations(tables)
if reference_sequence is not None:
tables.reference_sequence.data = reference_sequence
[docs]def next_slim_mutation_id(ts):
'''
Returns the next SLiM mutation ID for this tree sequence. This is useful
because if you want to add more mutations to your SLiM tree sequence using
:func:`msprime.sim_mutations`, you may need to specify the parameter
`next_id` in your :class:`msprime.SLiMMutationModel` to be larger than any
existing mutation IDs. Setting `next_id` equal to the output of this
function will allow the mutated tree sequence to be read in by SLiM.
To do this, recall that the "derived state" of SLiM's mutations are
comma-separated strings of mutation IDs; this function just parses all derived
states and returns one larger than the largest integer found. It will return an error
if it encounters derived states that are not comma-separated strings of integers.
'''
max_id = 0
for mut in ts.mutations():
ds = mut.derived_state
if len(ds) > 0:
for d in ds.split(","):
try:
max_id = max(max_id, int(d))
except ValueError:
raise ValueError(f"The derived state of a mutation ({ds}) in the tree "
"sequence is not a comma-separated list of values "
"coercible to int. This is not a valid SLiM tree sequence.")
return max_id + 1
def _annotate_nodes_individuals(tables, age):
'''
Adds to a TableCollection the information relevant to individuals required
for SLiM to load in a tree sequence, that is found in Node and Individual
tables. This will replace the metadata in those tables. For this to work,
assumes all sample nodes are in individuals.
This is designed to make it easy to assign default values:
- (location) individual locations to (0, 0, 0)
- (age) individual age to 0
- (ind_id) SLiM individual pedigree IDs to sequential integers starting from 0
- (ind_population) individual populations to 0
- (node_id) SLiM genome IDs to sequential integers starting with samples from 0
- (node_is_null) genomes to be non-null
- (node_type) genome type to 0 (= autosome)
- (ind_flags) INDIVIDUAL_ALIVE
If you have other situations, like non-alive "remembered" individuals, you
will need to edit the tables by hand, afterwards.
'''
ind_population = np.full(tables.individuals.num_rows, -1, dtype="int")
ind_slim_id = np.full(tables.individuals.num_rows, 0, dtype='int')
nid = 0
node_metadata = []
for j, n in enumerate(tables.nodes):
if n.flags & tskit.NODE_IS_SAMPLE > 0:
i = n.individual
if i == tskit.NULL:
raise ValueError("To annotate, samples must be in individuals.")
else:
ind_population[i] = n.population
ind_slim_id[i] = 1
md = default_slim_metadata("node")
md["slim_id"] = nid
nid += 1
else:
md = n.metadata
node_metadata.append(md)
nms = tables.nodes.metadata_schema
tables.nodes.packset_metadata([
nms.validate_and_encode_row(x)
for x in node_metadata
])
slim_ind = (ind_slim_id != 0)
ind_slim_id = np.cumsum(ind_slim_id) - 1
ind_metadata = []
ind_flags = tables.individuals.flags
for j, ind in enumerate(tables.individuals):
if slim_ind[j]:
md = default_slim_metadata("individual")
md["pedigree_id"] = int(ind_slim_id[j])
md["subpopulation"] = int(ind_population[j])
md["age"] = age
# no packset analogue for flags, so do it here
# and we don't have to resize for flags,
# so no big deal
ind_flags[j] |= INDIVIDUAL_ALIVE
else:
md = ind.metadata
ind_metadata.append(md)
tables.individuals.set_columns(
flags=ind_flags,
parents=tables.individuals.parents,
parents_offset=tables.individuals.parents_offset,
)
ims = tables.individuals.metadata_schema
tables.individuals.packset_metadata([
ims.validate_and_encode_row(x)
for x in ind_metadata
])
tables.individuals.packset_location(
[[0.0] * 3 if si else [] for si in slim_ind]
)
def _annotate_populations(tables):
'''
Adds to a TableCollection the information about populations required for SLiM
to load a tree sequence. This will replace anything already in the Population
table for populations referenced by nodes alive at time zero.
'''
alive_ind = (tables.individuals.flags & INDIVIDUAL_ALIVE > 0)
do_pops = np.unique(
tables.nodes.population[
np.logical_and(
tables.nodes.individual >= 0,
alive_ind[tables.nodes.individual]
)
]
)
for j, p in enumerate(tables.populations):
if j in do_pops:
if "slim_id" not in p.metadata:
md = default_slim_metadata("population")
md["slim_id"] = j
md["name"] = f"pop_{j}"
tables.populations[j] = p.replace(metadata=md)
def _annotate_sites_mutations(tables):
'''
Adds to a TableCollection the information relevant to mutations required
for SLiM to load in a tree sequence. This means adding to the metadata column
of the Mutation table, It will also
- give SLiM IDs to each mutation
- replace ancestral states with ""
This will replace any information already in the metadata or derived state
columns of the Mutation table. We set slim_time in metadata so that
- tick = floor(tskit time) + slim_time
'''
if len(tables.mutations.metadata) > 0:
warnings.warn(
"The provided tree sequence already has some mutations with "
"metadata; this metadata will be overwritten."
)
num_mutations = tables.mutations.num_rows
default_mut = default_slim_metadata("mutation_list_entry")
dsb, dso = tskit.pack_bytes([str(j).encode() for j in range(num_mutations)])
slim_time = tables.metadata["SLiM"]["tick"] - np.floor(tables.mutations.time).astype("int")
mms = tables.mutations.metadata_schema
mutation_metadata = [
mms.encode_row(
{"mutation_list":
[{"mutation_type": default_mut["mutation_type"],
"selection_coeff": default_mut["selection_coeff"],
"subpopulation": default_mut["subpopulation"],
"slim_time": st,
"nucleotide": default_mut["nucleotide"]
}]
})
for st in slim_time]
mdb, mdo = tskit.pack_bytes(mutation_metadata)
tables.mutations.set_columns(
site=tables.mutations.site,
node=tables.mutations.node,
time=tables.mutations.time,
derived_state=dsb,
derived_state_offset=dso,
parent=tables.mutations.parent,
metadata=mdb,
metadata_offset=mdo)
tables.sites.set_columns(
position=tables.sites.position,
ancestral_state=np.array([], dtype='int8'),
ancestral_state_offset=np.zeros(tables.sites.num_rows + 1, dtype='uint32'))