import logging
import math
import random
from abc import ABC, abstractmethod
from typing import Tuple, Sequence, TypeVar, Generic, Optional, Union
import numpy as np
import pandas as pd
import scipy.stats
from sklearn.model_selection import StratifiedShuffleSplit
from ..util.pickle import setstate
from ..util.string import ToStringMixin
log = logging.getLogger(__name__)
T = TypeVar("T")
TInputOutputData = TypeVar("TInputOutputData", bound=BaseInputOutputData)
[docs]class DataSplitter(ABC, Generic[TInputOutputData]):
[docs] @abstractmethod
def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
pass
[docs]class DataSplitterFractional(DataSplitter):
def __init__(self, fractional_size_of_first_set: float, shuffle=True, random_seed=42):
if not 0 <= fractional_size_of_first_set <= 1:
raise Exception(f"invalid fraction: {fractional_size_of_first_set}")
self.fractionalSizeOfFirstSet = fractional_size_of_first_set
self.shuffle = shuffle
self.randomSeed = random_seed
[docs] def split_with_indices(self, data) -> Tuple[Tuple[Sequence[int], Sequence[int]], Tuple[TInputOutputData, TInputOutputData]]:
num_data_points = len(data)
split_index = int(num_data_points * self.fractionalSizeOfFirstSet)
if self.shuffle:
rand = np.random.RandomState(self.randomSeed)
indices = rand.permutation(num_data_points)
else:
indices = range(num_data_points)
indices_a = indices[:split_index]
indices_b = indices[split_index:]
a = data.filter_indices(list(indices_a))
b = data.filter_indices(list(indices_b))
return (indices_a, indices_b), (a, b)
[docs] def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
_, (a, b) = self.split_with_indices(data)
return a, b
[docs]class DataSplitterFromDataFrameSplitter(DataSplitter[InputOutputData]):
"""
Creates a DataSplitter from a DataFrameSplitter, which can be applied either to the input or the output data.
It supports only InputOutputData, not other subclasses of BaseInputOutputData.
"""
def __init__(self, data_frame_splitter: "DataFrameSplitter", fractional_size_of_first_set: float, apply_to_input=True):
"""
:param data_frame_splitter: the splitter to apply
:param fractional_size_of_first_set: the desired fractional size of the first set when applying the splitter
:param apply_to_input: if True, apply the splitter to the input data frame; if False, apply it to the output data frame
"""
self.dataFrameSplitter = data_frame_splitter
self.fractionalSizeOfFirstSet = fractional_size_of_first_set
self.applyToInput = apply_to_input
[docs] def split(self, data: InputOutputData) -> Tuple[InputOutputData, InputOutputData]:
if not isinstance(data, InputOutputData):
raise ValueError(f"{self} is only applicable to instances of {InputOutputData.__name__}, got {data}")
df = data.inputs if self.applyToInput else data.outputs
indices_a, indices_b = self.dataFrameSplitter.compute_split_indices(df, self.fractionalSizeOfFirstSet)
a = data.filter_indices(list(indices_a))
b = data.filter_indices(list(indices_b))
return a, b
[docs]class DataSplitterFromSkLearnSplitter(DataSplitter):
def __init__(self, sklearn_splitter):
"""
:param sklearn_splitter: an instance of one of the splitter classes from sklearn.model_selection,
see https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection
"""
self.sklearn_splitter = sklearn_splitter
[docs] def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
splitter_result = self.sklearn_splitter.split(data.inputs, data.outputs)
split = next(iter(splitter_result))
first_indices, second_indices = split
return data.filter_indices(first_indices), data.filter_indices(second_indices)
[docs]class DataSplitterStratifiedShuffleSplit(DataSplitterFromSkLearnSplitter):
def __init__(self, fractional_size_of_first_set: float, random_seed=42):
super().__init__(StratifiedShuffleSplit(n_splits=1, train_size=fractional_size_of_first_set, random_state=random_seed))
[docs] @staticmethod
def is_applicable(io_data: InputOutputData):
class_counts = io_data.outputs.value_counts()
return all(class_counts >= 2)
[docs]class DataFrameSplitter(ABC):
[docs] @abstractmethod
def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
pass
[docs] @staticmethod
def split_with_indices(df: pd.DataFrame, indices_pair: Tuple[Sequence[int], Sequence[int]]) -> Tuple[pd.DataFrame, pd.DataFrame]:
indices_a, indices_b = indices_pair
a = df.iloc[indices_a]
b = df.iloc[indices_b]
return a, b
[docs] def split(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
return self.split_with_indices(df, self.compute_split_indices(df, fractional_size_of_first_set))
[docs]class DataFrameSplitterFractional(DataFrameSplitter):
def __init__(self, shuffle=False, random_seed=42):
self.randomSeed = random_seed
self.shuffle = shuffle
[docs] def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
n = df.shape[0]
size_a = int(n * fractional_size_of_first_set)
if self.shuffle:
rand = np.random.RandomState(self.randomSeed)
indices = rand.permutation(n)
else:
indices = list(range(n))
indices_a = indices[:size_a]
indices_b = indices[size_a:]
return indices_a, indices_b
[docs]class DataFrameSplitterColumnEquivalenceClass(DataFrameSplitter):
"""
Performs a split that keeps together data points/rows that have the same value in a given column, i.e.
with respect to that column, the items having the same values are viewed as a unit; they form an equivalence class, and all
data points belonging to the same class are either in the first set or the second set.
The split is performed at the level of unique items in the column, i.e. the given fraction of equivalence
classes will end up in the first set and the rest in the second set.
The list if unique items in the column can be shuffled before applying the split. If no shuffling is applied,
the original order in the data frame is maintained, and if the items were grouped by equivalence class in the
original data frame, the split will correspond to a fractional split without shuffling where the split boundary
is adjusted to not separate an equivalence class.
"""
def __init__(self, column: str, shuffle=True, random_seed=42):
"""
:param column: the column which defines the equivalence classes (groups of data points/rows that must not be separated)
:param shuffle: whether to shuffle the list of unique values in the given column before applying the split
:param random_seed:
"""
self.column = column
self.shuffle = shuffle
self.random_seed = random_seed
[docs] def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
values = list(df[self.column].unique())
if self.shuffle:
rng = random.Random(self.random_seed)
rng.shuffle(values)
num_items_in_first_set = round(fractional_size_of_first_set * len(values))
first_set_values = set(values[:num_items_in_first_set])
first_set_indices = []
second_set_indices = []
for i, t in enumerate(df.itertuples()):
if getattr(t, self.column) in first_set_values:
first_set_indices.append(i)
else:
second_set_indices.append(i)
return first_set_indices, second_set_indices
[docs]class DataPointWeighting(ABC):
[docs] @abstractmethod
def compute_weights(self, x: pd.DataFrame, y: pd.DataFrame) -> pd.Series:
pass
[docs]class DataPointWeightingRegressionTargetIntervalTotalWeight(DataPointWeighting):
"""
Based on relative weights specified for intervals of the regression target,
will weight individual data point weights such that the sum of weights of data points within each interval
satisfies the user-specified relative weight, while ensuring that the total weight of all data points
is still equal to the number of data points.
For example, if one specifies `interval_weights` as [(0.5, 1), (inf, 2)], then the data points with target values
up to 0.5 will get 1/3 of the weight and the remaining data points will get 2/3 of the weight.
So if there are 100 data points and 50 of them are in the first interval (up to 0.5), then these 50 data points
will each get weight 1/3*100/50=2/3 and the remaining 50 data points will each get weight 2/3*100/50=4/3.
The sum of all weights is the number of data points, i.e. 100.
Example:
>>> targets = [0.1, 0.2, 0.5, 0.7, 0.8, 0.6]
>>> x = pd.DataFrame({"foo": np.zeros(len(targets))})
>>> y = pd.DataFrame({"target": targets})
>>> weighting = DataPointWeightingRegressionTargetIntervalTotalWeight([(0.5, 1), (1.0, 2)])
>>> weights = weighting.compute_weights(x, y)
>>> assert(np.isclose(weights.sum(), len(y)))
>>> weights.tolist()
[0.6666666666666666,
0.6666666666666666,
0.6666666666666666,
1.3333333333333333,
1.3333333333333333,
1.3333333333333333]
"""
def __init__(self, intervals_weights: Sequence[Tuple[float, float]]):
"""
:param intervals_weights: a sequence of tuples (upper_bound, rel_total_weight) where upper_bound is the upper bound
of the interval, `(lower_bound, upper_bound]`; `lower_bound` is the upper bound of the preceding interval
or -inf for the first interval. `rel_total_weight` specifies the relative weight of all data points within
the interval.
"""
a = -math.inf
sum_rel_weights = sum(t[1] for t in intervals_weights)
self.intervals = []
for b, rel_weight in intervals_weights:
self.intervals.append(self.Interval(a, b, rel_weight / sum_rel_weights))
a = b
[docs] class Interval:
def __init__(self, a: float, b: float, weight_fraction: float):
self.a = a
self.b = b
self.weight_fraction = weight_fraction
[docs] def contains(self, x: float):
return self.a < x <= self.b
[docs] def compute_weights(self, x: pd.DataFrame, y: pd.DataFrame) -> pd.Series:
assert len(y.columns) == 1, f"Only a single regression target is supported {self.__class__.__name__}"
targets = y.iloc[:, 0]
n = len(x)
weights = np.zeros(n)
num_weighted = 0
for interval in self.intervals:
mask = np.array([interval.contains(x) for x in targets])
subset_size = mask.sum()
num_weighted += subset_size
weights[mask] = interval.weight_fraction * n / subset_size
if num_weighted != n:
raise Exception("Not all data points were weighted. Most likely, the intervals do not cover the entire range of targets")
return pd.Series(weights, index=x.index)