""" The evolutionary dataset optimisation algorithm class. """

from collections import defaultdict
from pathlib import Path

import dask.dataframe as dd
import numpy as np
import pandas as pd

from import get_population_fitness, write_fitness
from edo.individual import Individual
from edo.operators import selection, shrink
from edo.population import create_initial_population, create_new_population

[docs]class DataOptimiser: """The (evolutionary) dataset optimiser. A class that generates data for a given fitness function and evolutionary parameters. Parameters ---------- fitness : func Any real-valued function that at least takes an instance of ``Individual`` as argument. Any further arguments should be passed in the ``kwargs`` parameter of the ``run`` method. size : int The size of the population to create. row_limits : list Lower and upper bounds on the number of rows a dataset can have. col_limits : list Lower and upper bounds on the number of columns a dataset can have. Tuples can also be used to specify the min/maximum number of columns there can be of each element in ``families``. families : list A list of ``edo.Family`` instances that handle the distribution classes used to populate the individuals in the EA. weights : list A set of relative weights on how to select elements from ``families``. If ``None``, they will be chosen uniformly. max_iter : int The maximum number of iterations to be carried out before terminating. best_prop : float The proportion of a population from which to select the "best" individuals to be parents. lucky_prop : float The proportion of a population from which to sample some "lucky" individuals to be parents. Defaults to ``0``. crossover_prob : float The probability with which to sample dimensions from the first parent over the second in a crossover operation. Defaults to ``0.5``. mutation_prob : float The probability of a particular characteristic of an individual being mutated. If using a ``dwindle`` method, this is an initial probability. shrinkage : float The relative size to shrink each parameter's limits by for each distribution in ``families``. Defaults to ``None`` but must be between 0 and 1 (exclusive). maximise : bool Determines whether ``fitness`` is a function to be maximised or not. Fitness scores are minimised by default. """ def __init__( self, fitness, size, row_limits, col_limits, families, weights=None, max_iter=100, best_prop=0.25, lucky_prop=0, crossover_prob=0.5, mutation_prob=0.01, shrinkage=None, maximise=False, ): = fitness self.size = size self.row_limits = row_limits self.col_limits = col_limits self.families = families self.weights = weights self.max_iter = max_iter self.best_prop = best_prop self.lucky_prop = lucky_prop self.crossover_prob = crossover_prob self.mutation_prob = mutation_prob self.shrinkage = shrinkage self.maximise = maximise self.converged = False self.generation = 0 self.population = None self.pop_fitness = None self.pop_history = [] self.fit_history = pd.DataFrame()
[docs] def stop(self, **kwargs): """A placeholder for a function which acts as a stopping condition on the EA."""
[docs] def dwindle(self, **kwargs): """A placeholder for a function which can adjust (typically, reduce) the mutation probability over the run of the EA."""
[docs] def run( self, root=None, random_state=None, processes=None, fitness_kwargs=None, stop_kwargs=None, dwindle_kwargs=None, ): """Run the evolutionary algorithm under the given constraints. Parameters ---------- root : str, optional The directory in which to write all generations to file. If ``None``, nothing is written to file. Instead, every generation is kept in memory and is returned at the end. If writing to file, one generation is held in memory at a time and everything is returned upon termination as a tuple containing ``dask`` objects. random_state : int or np.ran.RandomState, optional The random seed or state for a particular run of the algorithm. If ``None``, the default PRNG is used. processes : int, optional The number of parallel processes to use when calculating the population fitness. If ``None`` then a single-thread scheduler is used. fitness_kwargs : dict, optional Any additional parameters for the fitness function should be placed here. stop_kwargs : dict, optional Any additional parameters for the ``stop`` method should be placed here. dwindle_kwargs : dict, optional Any additional parameters for the ``dwindle`` method should be placed here. Returns ------- pop_history : list Every individual in each generation as a nested list of ``Individual`` instances. fit_history : ``pd.DataFrame`` or ``dask.dataframe.DataFrame`` Every individual's fitness in each generation. """ if fitness_kwargs is None: fitness_kwargs = {} if stop_kwargs is None: stop_kwargs = {} if dwindle_kwargs is None: dwindle_kwargs = {} if isinstance(random_state, int): self.random_state = np.random.RandomState(random_state) elif isinstance(random_state, np.random.RandomState): self.random_state = random_state else: self.random_state = np.random.mtrand._rand self._initialise_run(processes, **fitness_kwargs) self._update_histories(root) self.stop(**stop_kwargs) while self.generation < self.max_iter and not self.converged: self.generation += 1 self._get_next_generation(processes, **fitness_kwargs) self._update_histories(root) self.stop(**stop_kwargs) self.dwindle(**dwindle_kwargs) if root is not None: distributions = [family.distribution for family in self.families] self.pop_history = _get_pop_history( root, self.generation, distributions ) self.fit_history = _get_fit_history(root) return self.pop_history, self.fit_history
def _initialise_run(self, processes, **fitness_kwargs): """ Create the initial population and get its fitness. """ state_seeds = self.random_state.randint( np.iinfo(np.int32).max, size=self.size ) self.states = { i: np.random.RandomState(seed) for i, seed in enumerate(state_seeds) } family_seeds = self.random_state.randint( np.iinfo(np.int32).max, size=len(self.families) ) for family, seed in zip(self.families, family_seeds): family.random_state = np.random.RandomState(seed) self.population = create_initial_population( self.row_limits, self.col_limits, self.families, self.weights, self.states, ) self.pop_fitness = get_population_fitness( self.population,, processes, **fitness_kwargs ) def _get_next_generation(self, processes, **kwargs): """Create the next population via selection, crossover and mutation, update the family subtypes and get the new population's fitness.""" parents = selection( self.population, self.pop_fitness, self.best_prop, self.lucky_prop, self.random_state, self.maximise, ) self._update_subtypes(parents) self.population = create_new_population( parents, self.population, self.crossover_prob, self.mutation_prob, self.row_limits, self.col_limits, self.families, self.weights, self.states, ) self.pop_fitness = get_population_fitness( self.population,, processes, **kwargs ) if self.shrinkage is not None: self.families = shrink( parents, self.families, self.generation, self.shrinkage ) def _update_pop_history(self): """ Add the current generation to the history. """ self.pop_history.append(self.population) def _update_fit_history(self): """ Add the current generation's population fitness to the history. """ fitness_df = pd.DataFrame( { "fitness": self.pop_fitness, "generation": self.generation, "individual": range(self.size), } ) self.fit_history = self.fit_history.append( fitness_df, ignore_index=True ) def _write_generation(self, root): """Write all individuals in a generation and their collective fitnesses to file at the generation's directory in `root`.""" write_fitness(self.pop_fitness, self.generation, root) for idx, individual in enumerate(self.population): individual.to_file(f"{root}/{self.generation}/{idx}/", root) def _update_histories(self, root): """ Update the population and fitness histories. """ if root is None: self._update_pop_history() self._update_fit_history() else: self._write_generation(root) def _get_current_subtypes(self, parents): """Get a dictionary mapping each family to all the subtype IDs that are present in the parents.""" family_to_subtype_ids = defaultdict(list) for parent in parents: for pdf in parent.metadata: family = subtype_id = pdf.subtype_id record_subtypes = family_to_subtype_ids[family] if subtype_id not in record_subtypes: family_to_subtype_ids[family].append(subtype_id) return family_to_subtype_ids def _update_subtypes(self, parents): """Update the current subtypes for each family to be those present in the parents.""" current_subtypes = self._get_current_subtypes(parents) for family, current_ids in current_subtypes.items(): family.subtypes = { subtype_id: family.all_subtypes[subtype_id] for subtype_id in current_ids }
def _get_pop_history(root, generation, distributions): """Read in the individuals from each generation. The dataset is given as a `dask.dataframe.core.DataFrame` but the metadata are recovered instances of their original class subtypes.""" pop_history = [] for gen in range(generation): population = [] gen_path = Path(f"{root}/{gen}") for ind_dir in sorted( gen_path.glob("*"), key=lambda path: int(path.stem) ): individual_dir = Path(ind_dir) individual = Individual.from_file( individual_dir, distributions, root, method="dask" ) population.append(individual) pop_history.append(population) return pop_history def _get_fit_history(root): """Read in the fitness history from each generation in a run as a `dask.dataframe.core.DataFrame`.""" return dd.read_csv(f"{root}/fitness.csv")