import logging
import random
from abc import ABC, abstractmethod
from typing import Tuple, Sequence, TypeVar, Generic
import numpy as np
import pandas as pd
import scipy.stats
from sklearn.model_selection import StratifiedShuffleSplit
from ..util.string import ToStringMixin
log = logging.getLogger(__name__)
T = TypeVar("T")
TInputOutputData = TypeVar("TInputOutputData", bound=BaseInputOutputData)
[docs]class DataSplitter(ABC, Generic[TInputOutputData]):
[docs] @abstractmethod
def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
pass
[docs]class DataSplitterFractional(DataSplitter):
def __init__(self, fractional_size_of_first_set: float, shuffle=True, random_seed=42):
if not 0 <= fractional_size_of_first_set <= 1:
raise Exception(f"invalid fraction: {fractional_size_of_first_set}")
self.fractionalSizeOfFirstSet = fractional_size_of_first_set
self.shuffle = shuffle
self.randomSeed = random_seed
[docs] def split_with_indices(self, data) -> Tuple[Tuple[Sequence[int], Sequence[int]], Tuple[TInputOutputData, TInputOutputData]]:
num_data_points = len(data)
split_index = int(num_data_points * self.fractionalSizeOfFirstSet)
if self.shuffle:
rand = np.random.RandomState(self.randomSeed)
indices = rand.permutation(num_data_points)
else:
indices = range(num_data_points)
indices_a = indices[:split_index]
indices_b = indices[split_index:]
a = data.filter_indices(list(indices_a))
b = data.filter_indices(list(indices_b))
return (indices_a, indices_b), (a, b)
[docs] def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
_, (a, b) = self.split_with_indices(data)
return a, b
[docs]class DataSplitterFromDataFrameSplitter(DataSplitter[InputOutputData]):
"""
Creates a DataSplitter from a DataFrameSplitter, which can be applied either to the input or the output data.
It supports only InputOutputData, not other subclasses of BaseInputOutputData.
"""
def __init__(self, data_frame_splitter: "DataFrameSplitter", fractional_size_of_first_set: float, apply_to_input=True):
"""
:param data_frame_splitter: the splitter to apply
:param fractional_size_of_first_set: the desired fractional size of the first set when applying the splitter
:param apply_to_input: if True, apply the splitter to the input data frame; if False, apply it to the output data frame
"""
self.dataFrameSplitter = data_frame_splitter
self.fractionalSizeOfFirstSet = fractional_size_of_first_set
self.applyToInput = apply_to_input
[docs] def split(self, data: InputOutputData) -> Tuple[InputOutputData, InputOutputData]:
if not isinstance(data, InputOutputData):
raise ValueError(f"{self} is only applicable to instances of {InputOutputData.__name__}, got {data}")
df = data.inputs if self.applyToInput else data.outputs
indices_a, indices_b = self.dataFrameSplitter.compute_split_indices(df, self.fractionalSizeOfFirstSet)
a = data.filter_indices(list(indices_a))
b = data.filter_indices(list(indices_b))
return a, b
[docs]class DataSplitterFromSkLearnSplitter(DataSplitter):
def __init__(self, sklearn_splitter):
"""
:param sklearn_splitter: an instance of one of the splitter classes from sklearn.model_selection,
see https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection
"""
self.sklearn_splitter = sklearn_splitter
[docs] def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
splitter_result = self.sklearn_splitter.split(data.inputs, data.outputs)
split = next(iter(splitter_result))
first_indices, second_indices = split
return data.filter_indices(first_indices), data.filter_indices(second_indices)
[docs]class DataSplitterStratifiedShuffleSplit(DataSplitterFromSkLearnSplitter):
def __init__(self, fractional_size_of_first_set: float, random_seed=42):
super().__init__(StratifiedShuffleSplit(n_splits=1, train_size=fractional_size_of_first_set, random_state=random_seed))
[docs] @staticmethod
def is_applicable(io_data: InputOutputData):
class_counts = io_data.outputs.value_counts()
return all(class_counts >= 2)
[docs]class DataFrameSplitter(ABC):
[docs] @abstractmethod
def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
pass
[docs] @staticmethod
def split_with_indices(df: pd.DataFrame, indices_pair: Tuple[Sequence[int], Sequence[int]]) -> Tuple[pd.DataFrame, pd.DataFrame]:
indices_a, indices_b = indices_pair
a = df.iloc[indices_a]
b = df.iloc[indices_b]
return a, b
[docs] def split(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
return self.split_with_indices(df, self.compute_split_indices(df, fractional_size_of_first_set))
[docs]class DataFrameSplitterFractional(DataFrameSplitter):
def __init__(self, shuffle=False, random_seed=42):
self.randomSeed = random_seed
self.shuffle = shuffle
[docs] def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
n = df.shape[0]
size_a = int(n * fractional_size_of_first_set)
if self.shuffle:
rand = np.random.RandomState(self.randomSeed)
indices = rand.permutation(n)
else:
indices = list(range(n))
indices_a = indices[:size_a]
indices_b = indices[size_a:]
return indices_a, indices_b
[docs]class DataFrameSplitterColumnEquivalenceClass(DataFrameSplitter):
"""
Performs a split that keeps together data points/rows that have the same value in a given column, i.e.
with respect to that column, the items having the same values are viewed as a unit; they form an equivalence class, and all
data points belonging to the same class are either in the first set or the second set.
The split is performed at the level of unique items in the column, i.e. the given fraction of equivalence
classes will end up in the first set and the rest in the second set.
The list if unique items in the column can be shuffled before applying the split. If no shuffling is applied,
the original order in the data frame is maintained, and if the items were grouped by equivalence class in the
original data frame, the split will correspond to a fractional split without shuffling where the split boundary
is adjusted to not separate an equivalence class.
"""
def __init__(self, column: str, shuffle=True, random_seed=42):
"""
:param column: the column which defines the equivalence classes (groups of data points/rows that must not be separated)
:param shuffle: whether to shuffle the list of unique values in the given column before applying the split
:param random_seed:
"""
self.column = column
self.shuffle = shuffle
self.random_seed = random_seed
[docs] def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
values = list(df[self.column].unique())
if self.shuffle:
rng = random.Random(self.random_seed)
rng.shuffle(values)
num_items_in_first_set = round(fractional_size_of_first_set * len(values))
first_set_values = set(values[:num_items_in_first_set])
first_set_indices = []
second_set_indices = []
for i, t in enumerate(df.itertuples()):
if getattr(t, self.column) in first_set_values:
first_set_indices.append(i)
else:
second_set_indices.append(i)
return first_set_indices, second_set_indices