Source code for edo.operators.mutation

""" Functions related to the mutation operator. """

from edo.individual import Individual

from .util import get_family_counts


[docs]def mutation(individual, prob, row_limits, col_limits, families, weights=None): """Mutate an individual. Here, the characteristics of an individual can be split into two parts: their dimensions, and their values. Each of these parts is mutated in a different way using the same probability, ``prob``. Parameters ---------- individual : Individual The individual to be mutated. prob : float The probability with which any characteristic of ``individual`` should be mutated. row_limits : list Lower and upper limits on the number of rows an individual can have. col_limits : list Lower and upper limits on the number of columns an individual can have. families: list Families of distributions with which to create new columns. weights : list, optional Probabilities with which to sample a distribution ``families``. If ``None``, sample uniformly. Returns ------- mutant : Individual A (potentially) mutated individual. """ dataframe, metadata = individual random_state = individual.random_state dataframe, metadata = mutate_nrows( dataframe, metadata, row_limits, random_state, prob ) dataframe, metadata = mutate_ncols( dataframe, metadata, col_limits, families, weights, random_state, prob ) dataframe = mutate_values(dataframe, metadata, random_state, prob) return Individual(dataframe, metadata, random_state)
[docs]def mutate_nrows(dataframe, metadata, row_limits, random_state, prob): """Mutate the number of rows an individual has by adding a new row and/or dropping a row at random so as not to exceed the bounds of ``row_limits``.""" if random_state.random() < prob and dataframe.shape[0] < row_limits[1]: dataframe = _add_row(dataframe, metadata, random_state) if random_state.random() < prob and dataframe.shape[0] > row_limits[0]: dataframe = _remove_row(dataframe, random_state) return dataframe, metadata
[docs]def mutate_ncols( dataframe, metadata, col_limits, families, weights, random_state, prob ): """Mutate the number of columns an individual has by adding a new column and/or dropping a column at random. In either case, the bounds defined in ``col_limits`` cannot be exceeded.""" if isinstance(col_limits[1], tuple): condition = dataframe.shape[1] < sum(col_limits[1]) else: condition = dataframe.shape[1] < col_limits[1] if random_state.random() < prob and condition: dataframe, metadata = _add_col( dataframe, metadata, col_limits, families, weights, random_state ) if isinstance(col_limits[0], tuple): condition = dataframe.shape[1] > sum(col_limits[0]) else: condition = dataframe.shape[1] > col_limits[0] if random_state.random() < prob and condition: dataframe, metadata = _remove_col( dataframe, metadata, col_limits, families, random_state ) return dataframe, metadata
[docs]def mutate_values(dataframe, metadata, random_state, prob): """Iterate over the values of ``dataframe`` and mutate them each with probability ``prob``. Mutating a value is done by resampling from the associated column distribution in ``metadata``.""" for j, col in enumerate(dataframe.columns): pdf = metadata[j] for i, value in enumerate(dataframe[col]): if random_state.random() < prob: value = pdf.sample(1, random_state)[0] dataframe.iloc[i, j] = value return dataframe
def _rename(dataframe): """Rename columns or reindex to make sense after deletion or addition of a new line.""" dataframe = dataframe.reset_index(drop=True) dataframe.columns = (i for i, _ in enumerate(dataframe.columns)) return dataframe def _add_row(dataframe, metadata, random_state): """Append a row to the dataframe by sampling values from each column's distribution.""" dataframe = dataframe.append( {i: pdf.sample(1, random_state)[0] for i, pdf in enumerate(metadata)}, ignore_index=True, ) return dataframe def _remove_row(dataframe, random_state): """ Remove a row from a dataframe at random. """ line = random_state.choice(dataframe.index) dataframe = _rename(dataframe.drop(line, axis=0)) return dataframe def _add_col(dataframe, metadata, col_limits, families, weights, random_state): """Add a new column to the end of the dataframe by sampling a distribution from ``families`` according to the column limits and distribution weights and sampling the required number of values from that distribution.""" nrows, ncols = dataframe.shape if isinstance(col_limits[1], tuple): family_counts = get_family_counts(metadata, families) while len(dataframe.columns) != ncols + 1: family = random_state.choice(families, p=weights) idx = families.index(family) if family_counts[family] < col_limits[1][idx]: pdf = family.make_instance(random_state) dataframe[ncols] = pdf.sample(nrows, random_state) metadata.append(pdf) dataframe = _rename(dataframe) return dataframe, metadata family = random_state.choice(families, p=weights) pdf = family.make_instance(random_state) dataframe[ncols] = pdf.sample(nrows, random_state) metadata.append(pdf) dataframe = _rename(dataframe) return dataframe, metadata def _remove_col(dataframe, metadata, col_limits, families, random_state): """ Remove a column (and its metadata) from an individual at random. """ if isinstance(col_limits[0], tuple): ncols = dataframe.shape[1] family_counts = get_family_counts(metadata, families) while len(dataframe.columns) != ncols - 1: col = random_state.choice(dataframe.columns) idx = dataframe.columns.get_loc(col) pdf = metadata[idx] family = pdf.family family_idx = families.index(family) if family_counts[family] > col_limits[0][family_idx]: dataframe = _rename(dataframe.drop(col, axis=1)) metadata.pop(idx) return dataframe, metadata col = random_state.choice(dataframe.columns) idx = dataframe.columns.get_loc(col) dataframe = _rename(dataframe.drop(col, axis=1)) metadata.pop(idx) return dataframe, metadata