Coverage for src/sensai/data/io_data.py: 57%
173 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
1import logging
2import random
3from abc import ABC, abstractmethod
4from typing import Tuple, Sequence, TypeVar, Generic
6import numpy as np
7import pandas as pd
8import scipy.stats
9from sklearn.model_selection import StratifiedShuffleSplit
11from ..util.string import ToStringMixin
13log = logging.getLogger(__name__)
15T = TypeVar("T")
18class BaseInputOutputData(Generic[T], ABC):
19 def __init__(self, inputs: T, outputs: T):
20 """
21 :param inputs: expected to have shape and __len__
22 :param outputs: expected to have shape and __len__
23 """
24 if len(inputs) != len(outputs):
25 raise ValueError("Lengths do not match")
26 self.inputs = inputs
27 self.outputs = outputs
29 def __len__(self):
30 return len(self.inputs)
32 @abstractmethod
33 def filter_indices(self, indices: Sequence[int]) -> __qualname__:
34 pass
37class InputOutputArrays(BaseInputOutputData[np.ndarray]):
38 def __init__(self, inputs: np.ndarray, outputs: np.ndarray):
39 super().__init__(inputs, outputs)
41 def filter_indices(self, indices: Sequence[int]) -> __qualname__:
42 inputs = self.inputs[indices]
43 outputs = self.outputs[indices]
44 return InputOutputArrays(inputs, outputs)
46 def to_torch_data_loader(self, batch_size=64, shuffle=True):
47 try:
48 import torch
49 from torch.utils.data import DataLoader, TensorDataset
50 except ImportError:
51 raise ImportError(f"Could not import torch, did you install it?")
52 dataset = TensorDataset(torch.tensor(self.inputs), torch.tensor(self.outputs))
53 return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
56class InputOutputData(BaseInputOutputData[pd.DataFrame], ToStringMixin):
57 """
58 Holds input and output data for learning problems
59 """
60 def __init__(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
61 super().__init__(inputs, outputs)
63 def _tostring_object_info(self) -> str:
64 return f"N={len(self.inputs)}, numInputColumns={len(self.inputs.columns)}, numOutputColumns={len(self.outputs.columns)}"
66 @classmethod
67 def from_data_frame(cls, df: pd.DataFrame, *output_columns: str) -> "InputOutputData":
68 """
69 :param df: a data frame containing both input and output columns
70 :param output_columns: the output column name(s)
71 :return: an InputOutputData instance with inputs and outputs separated
72 """
73 inputs = df[[c for c in df.columns if c not in output_columns]]
74 outputs = df[list(output_columns)]
75 return cls(inputs, outputs)
77 def filter_indices(self, indices: Sequence[int]) -> __qualname__:
78 inputs = self.inputs.iloc[indices]
79 outputs = self.outputs.iloc[indices]
80 return InputOutputData(inputs, outputs)
82 def filter_index(self, index_elements: Sequence[any]) -> __qualname__:
83 inputs = self.inputs.loc[index_elements]
84 outputs = self.outputs.loc[index_elements]
85 return InputOutputData(inputs, outputs)
87 @property
88 def input_dim(self):
89 return self.inputs.shape[1]
91 @property
92 def output_dim(self):
93 return self.outputs.shape[1]
95 def compute_input_output_correlation(self):
96 correlations = {}
97 for outputCol in self.outputs.columns:
98 correlations[outputCol] = {}
99 output_series = self.outputs[outputCol]
100 for inputCol in self.inputs.columns:
101 input_series = self.inputs[inputCol]
102 pcc, pvalue = scipy.stats.pearsonr(input_series, output_series)
103 correlations[outputCol][inputCol] = pcc
104 return correlations
106 def to_df(self) -> pd.DataFrame:
107 return pd.concat((self.inputs, self.outputs), axis=1)
110TInputOutputData = TypeVar("TInputOutputData", bound=BaseInputOutputData)
113class DataSplitter(ABC, Generic[TInputOutputData]):
114 @abstractmethod
115 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
116 pass
119class DataSplitterFractional(DataSplitter):
120 def __init__(self, fractional_size_of_first_set: float, shuffle=True, random_seed=42):
121 if not 0 <= fractional_size_of_first_set <= 1:
122 raise Exception(f"invalid fraction: {fractional_size_of_first_set}")
123 self.fractionalSizeOfFirstSet = fractional_size_of_first_set
124 self.shuffle = shuffle
125 self.randomSeed = random_seed
127 def split_with_indices(self, data) -> Tuple[Tuple[Sequence[int], Sequence[int]], Tuple[TInputOutputData, TInputOutputData]]:
128 num_data_points = len(data)
129 split_index = int(num_data_points * self.fractionalSizeOfFirstSet)
130 if self.shuffle:
131 rand = np.random.RandomState(self.randomSeed)
132 indices = rand.permutation(num_data_points)
133 else:
134 indices = range(num_data_points)
135 indices_a = indices[:split_index]
136 indices_b = indices[split_index:]
137 a = data.filter_indices(list(indices_a))
138 b = data.filter_indices(list(indices_b))
139 return (indices_a, indices_b), (a, b)
141 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
142 _, (a, b) = self.split_with_indices(data)
143 return a, b
146class DataSplitterFromDataFrameSplitter(DataSplitter[InputOutputData]):
147 """
148 Creates a DataSplitter from a DataFrameSplitter, which can be applied either to the input or the output data.
149 It supports only InputOutputData, not other subclasses of BaseInputOutputData.
150 """
151 def __init__(self, data_frame_splitter: "DataFrameSplitter", fractional_size_of_first_set: float, apply_to_input=True):
152 """
153 :param data_frame_splitter: the splitter to apply
154 :param fractional_size_of_first_set: the desired fractional size of the first set when applying the splitter
155 :param apply_to_input: if True, apply the splitter to the input data frame; if False, apply it to the output data frame
156 """
157 self.dataFrameSplitter = data_frame_splitter
158 self.fractionalSizeOfFirstSet = fractional_size_of_first_set
159 self.applyToInput = apply_to_input
161 def split(self, data: InputOutputData) -> Tuple[InputOutputData, InputOutputData]:
162 if not isinstance(data, InputOutputData):
163 raise ValueError(f"{self} is only applicable to instances of {InputOutputData.__name__}, got {data}")
164 df = data.inputs if self.applyToInput else data.outputs
165 indices_a, indices_b = self.dataFrameSplitter.compute_split_indices(df, self.fractionalSizeOfFirstSet)
166 a = data.filter_indices(list(indices_a))
167 b = data.filter_indices(list(indices_b))
168 return a, b
171class DataSplitterFromSkLearnSplitter(DataSplitter):
172 def __init__(self, sklearn_splitter):
173 """
174 :param sklearn_splitter: an instance of one of the splitter classes from sklearn.model_selection,
175 see https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection
176 """
177 self.sklearn_splitter = sklearn_splitter
179 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
180 splitter_result = self.sklearn_splitter.split(data.inputs, data.outputs)
181 split = next(iter(splitter_result))
182 first_indices, second_indices = split
183 return data.filter_indices(first_indices), data.filter_indices(second_indices)
186class DataSplitterStratifiedShuffleSplit(DataSplitterFromSkLearnSplitter):
187 def __init__(self, fractional_size_of_first_set: float, random_seed=42):
188 super().__init__(StratifiedShuffleSplit(n_splits=1, train_size=fractional_size_of_first_set, random_state=random_seed))
190 @staticmethod
191 def is_applicable(io_data: InputOutputData):
192 class_counts = io_data.outputs.value_counts()
193 return all(class_counts >= 2)
196class DataFrameSplitter(ABC):
197 @abstractmethod
198 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
199 pass
201 @staticmethod
202 def split_with_indices(df: pd.DataFrame, indices_pair: Tuple[Sequence[int], Sequence[int]]) -> Tuple[pd.DataFrame, pd.DataFrame]:
203 indices_a, indices_b = indices_pair
204 a = df.iloc[indices_a]
205 b = df.iloc[indices_b]
206 return a, b
208 def split(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
209 return self.split_with_indices(df, self.compute_split_indices(df, fractional_size_of_first_set))
212class DataFrameSplitterFractional(DataFrameSplitter):
213 def __init__(self, shuffle=False, random_seed=42):
214 self.randomSeed = random_seed
215 self.shuffle = shuffle
217 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
218 n = df.shape[0]
219 size_a = int(n * fractional_size_of_first_set)
220 if self.shuffle:
221 rand = np.random.RandomState(self.randomSeed)
222 indices = rand.permutation(n)
223 else:
224 indices = list(range(n))
225 indices_a = indices[:size_a]
226 indices_b = indices[size_a:]
227 return indices_a, indices_b
230class DataFrameSplitterColumnEquivalenceClass(DataFrameSplitter):
231 """
232 Performs a split that keeps together data points/rows that have the same value in a given column, i.e.
233 with respect to that column, the items having the same values are viewed as a unit; they form an equivalence class, and all
234 data points belonging to the same class are either in the first set or the second set.
236 The split is performed at the level of unique items in the column, i.e. the given fraction of equivalence
237 classes will end up in the first set and the rest in the second set.
239 The list if unique items in the column can be shuffled before applying the split. If no shuffling is applied,
240 the original order in the data frame is maintained, and if the items were grouped by equivalence class in the
241 original data frame, the split will correspond to a fractional split without shuffling where the split boundary
242 is adjusted to not separate an equivalence class.
243 """
244 def __init__(self, column: str, shuffle=True, random_seed=42):
245 """
246 :param column: the column which defines the equivalence classes (groups of data points/rows that must not be separated)
247 :param shuffle: whether to shuffle the list of unique values in the given column before applying the split
248 :param random_seed:
249 """
250 self.column = column
251 self.shuffle = shuffle
252 self.random_seed = random_seed
254 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
255 values = list(df[self.column].unique())
256 if self.shuffle:
257 rng = random.Random(self.random_seed)
258 rng.shuffle(values)
260 num_items_in_first_set = round(fractional_size_of_first_set * len(values))
261 first_set_values = set(values[:num_items_in_first_set])
263 first_set_indices = []
264 second_set_indices = []
265 for i, t in enumerate(df.itertuples()):
266 if getattr(t, self.column) in first_set_values:
267 first_set_indices.append(i)
268 else:
269 second_set_indices.append(i)
270 return first_set_indices, second_set_indices