Coverage for src/sensai/data/io_data.py: 52%
226 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
1import logging
2import math
3import random
4from abc import ABC, abstractmethod
5from typing import Tuple, Sequence, TypeVar, Generic, Optional, Union
7import numpy as np
8import pandas as pd
9import scipy.stats
10from sklearn.model_selection import StratifiedShuffleSplit
12from ..util.pickle import setstate
13from ..util.string import ToStringMixin
15log = logging.getLogger(__name__)
17T = TypeVar("T")
20class BaseInputOutputData(Generic[T], ABC):
21 def __init__(self, inputs: T, outputs: T):
22 """
23 :param inputs: expected to have shape and __len__
24 :param outputs: expected to have shape and __len__
25 """
26 if len(inputs) != len(outputs):
27 raise ValueError("Lengths do not match")
28 self.inputs = inputs
29 self.outputs = outputs
31 def __len__(self):
32 return len(self.inputs)
34 @abstractmethod
35 def filter_indices(self, indices: Sequence[int]) -> __qualname__:
36 pass
39class InputOutputArrays(BaseInputOutputData[np.ndarray]):
40 def __init__(self, inputs: np.ndarray, outputs: np.ndarray):
41 super().__init__(inputs, outputs)
43 def filter_indices(self, indices: Sequence[int]) -> __qualname__:
44 inputs = self.inputs[indices]
45 outputs = self.outputs[indices]
46 return InputOutputArrays(inputs, outputs)
48 def to_torch_data_loader(self, batch_size=64, shuffle=True):
49 try:
50 import torch
51 from torch.utils.data import DataLoader, TensorDataset
52 except ImportError:
53 raise ImportError(f"Could not import torch, did you install it?")
54 dataset = TensorDataset(torch.tensor(self.inputs), torch.tensor(self.outputs))
55 return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
58class InputOutputData(BaseInputOutputData[pd.DataFrame], ToStringMixin):
59 """
60 Holds input and output data for learning problems
61 """
62 def __init__(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[Union[pd.Series, "DataPointWeighting"]] = None):
63 super().__init__(inputs, outputs)
64 if isinstance(weights, DataPointWeighting):
65 weights = weights.compute_weights(inputs, outputs)
66 self.weights = weights
68 def __setstate__(self, state):
69 setstate(InputOutputData, self, state, new_optional_properties=["weights"])
71 def _tostring_object_info(self) -> str:
72 return f"N={len(self.inputs)}, numInputColumns={len(self.inputs.columns)}, numOutputColumns={len(self.outputs.columns)}"
74 @classmethod
75 def from_data_frame(cls, df: pd.DataFrame, *output_columns: str) -> "InputOutputData":
76 """
77 :param df: a data frame containing both input and output columns
78 :param output_columns: the output column name(s)
79 :return: an InputOutputData instance with inputs and outputs separated
80 """
81 inputs = df[[c for c in df.columns if c not in output_columns]]
82 outputs = df[list(output_columns)]
83 return cls(inputs, outputs)
85 def to_data_frame(self, add_weights: bool = False, weights_col_name: str = "weights") -> pd.DataFrame:
86 """
87 :param add_weights: whether to add the weights as a column (provided that weights are present)
88 :param weights_col_name: the column name to use for weights if `add_weights` is True
89 :return: a data frame containing both the inputs and outputs (and optionally the weights)
90 """
91 df = pd.concat([self.inputs, self.outputs], axis=1)
92 if add_weights and self.weights is not None:
93 df[weights_col_name] = self.weights
94 return df
96 def to_df(self, add_weights: bool = False, weights_col_name: str = "weights") -> pd.DataFrame:
97 return self.to_data_frame(add_weights=add_weights, weights_col_name=weights_col_name)
99 def filter_indices(self, indices: Sequence[int]) -> __qualname__:
100 inputs = self.inputs.iloc[indices]
101 outputs = self.outputs.iloc[indices]
102 weights = None
103 if self.weights is not None:
104 weights = self.weights.iloc[indices]
105 return InputOutputData(inputs, outputs, weights)
107 def filter_index(self, index_elements: Sequence[any]) -> __qualname__:
108 inputs = self.inputs.loc[index_elements]
109 outputs = self.outputs.loc[index_elements]
110 weights = None
111 if self.weights is not None:
112 weights = self.weights
113 return InputOutputData(inputs, outputs, weights)
115 @property
116 def input_dim(self):
117 return self.inputs.shape[1]
119 @property
120 def output_dim(self):
121 return self.outputs.shape[1]
123 def compute_input_output_correlation(self):
124 correlations = {}
125 for outputCol in self.outputs.columns:
126 correlations[outputCol] = {}
127 output_series = self.outputs[outputCol]
128 for inputCol in self.inputs.columns:
129 input_series = self.inputs[inputCol]
130 pcc, pvalue = scipy.stats.pearsonr(input_series, output_series)
131 correlations[outputCol][inputCol] = pcc
132 return correlations
134 def apply_weighting(self, weighting: "DataPointWeighting"):
135 self.weights = weighting.compute_weights(self.inputs, self.outputs)
138TInputOutputData = TypeVar("TInputOutputData", bound=BaseInputOutputData)
141class DataSplitter(ABC, Generic[TInputOutputData]):
142 @abstractmethod
143 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
144 pass
147class DataSplitterFractional(DataSplitter):
148 def __init__(self, fractional_size_of_first_set: float, shuffle=True, random_seed=42):
149 if not 0 <= fractional_size_of_first_set <= 1:
150 raise Exception(f"invalid fraction: {fractional_size_of_first_set}")
151 self.fractionalSizeOfFirstSet = fractional_size_of_first_set
152 self.shuffle = shuffle
153 self.randomSeed = random_seed
155 def split_with_indices(self, data) -> Tuple[Tuple[Sequence[int], Sequence[int]], Tuple[TInputOutputData, TInputOutputData]]:
156 num_data_points = len(data)
157 split_index = int(num_data_points * self.fractionalSizeOfFirstSet)
158 if self.shuffle:
159 rand = np.random.RandomState(self.randomSeed)
160 indices = rand.permutation(num_data_points)
161 else:
162 indices = range(num_data_points)
163 indices_a = indices[:split_index]
164 indices_b = indices[split_index:]
165 a = data.filter_indices(list(indices_a))
166 b = data.filter_indices(list(indices_b))
167 return (indices_a, indices_b), (a, b)
169 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
170 _, (a, b) = self.split_with_indices(data)
171 return a, b
174class DataSplitterFromDataFrameSplitter(DataSplitter[InputOutputData]):
175 """
176 Creates a DataSplitter from a DataFrameSplitter, which can be applied either to the input or the output data.
177 It supports only InputOutputData, not other subclasses of BaseInputOutputData.
178 """
179 def __init__(self, data_frame_splitter: "DataFrameSplitter", fractional_size_of_first_set: float, apply_to_input=True):
180 """
181 :param data_frame_splitter: the splitter to apply
182 :param fractional_size_of_first_set: the desired fractional size of the first set when applying the splitter
183 :param apply_to_input: if True, apply the splitter to the input data frame; if False, apply it to the output data frame
184 """
185 self.dataFrameSplitter = data_frame_splitter
186 self.fractionalSizeOfFirstSet = fractional_size_of_first_set
187 self.applyToInput = apply_to_input
189 def split(self, data: InputOutputData) -> Tuple[InputOutputData, InputOutputData]:
190 if not isinstance(data, InputOutputData):
191 raise ValueError(f"{self} is only applicable to instances of {InputOutputData.__name__}, got {data}")
192 df = data.inputs if self.applyToInput else data.outputs
193 indices_a, indices_b = self.dataFrameSplitter.compute_split_indices(df, self.fractionalSizeOfFirstSet)
194 a = data.filter_indices(list(indices_a))
195 b = data.filter_indices(list(indices_b))
196 return a, b
199class DataSplitterFromSkLearnSplitter(DataSplitter):
200 def __init__(self, sklearn_splitter):
201 """
202 :param sklearn_splitter: an instance of one of the splitter classes from sklearn.model_selection,
203 see https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection
204 """
205 self.sklearn_splitter = sklearn_splitter
207 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]:
208 splitter_result = self.sklearn_splitter.split(data.inputs, data.outputs)
209 split = next(iter(splitter_result))
210 first_indices, second_indices = split
211 return data.filter_indices(first_indices), data.filter_indices(second_indices)
214class DataSplitterStratifiedShuffleSplit(DataSplitterFromSkLearnSplitter):
215 def __init__(self, fractional_size_of_first_set: float, random_seed=42):
216 super().__init__(StratifiedShuffleSplit(n_splits=1, train_size=fractional_size_of_first_set, random_state=random_seed))
218 @staticmethod
219 def is_applicable(io_data: InputOutputData):
220 class_counts = io_data.outputs.value_counts()
221 return all(class_counts >= 2)
224class DataFrameSplitter(ABC):
225 @abstractmethod
226 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
227 pass
229 @staticmethod
230 def split_with_indices(df: pd.DataFrame, indices_pair: Tuple[Sequence[int], Sequence[int]]) -> Tuple[pd.DataFrame, pd.DataFrame]:
231 indices_a, indices_b = indices_pair
232 a = df.iloc[indices_a]
233 b = df.iloc[indices_b]
234 return a, b
236 def split(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
237 return self.split_with_indices(df, self.compute_split_indices(df, fractional_size_of_first_set))
240class DataFrameSplitterFractional(DataFrameSplitter):
241 def __init__(self, shuffle=False, random_seed=42):
242 self.randomSeed = random_seed
243 self.shuffle = shuffle
245 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
246 n = df.shape[0]
247 size_a = int(n * fractional_size_of_first_set)
248 if self.shuffle:
249 rand = np.random.RandomState(self.randomSeed)
250 indices = rand.permutation(n)
251 else:
252 indices = list(range(n))
253 indices_a = indices[:size_a]
254 indices_b = indices[size_a:]
255 return indices_a, indices_b
258class DataFrameSplitterColumnEquivalenceClass(DataFrameSplitter):
259 """
260 Performs a split that keeps together data points/rows that have the same value in a given column, i.e.
261 with respect to that column, the items having the same values are viewed as a unit; they form an equivalence class, and all
262 data points belonging to the same class are either in the first set or the second set.
264 The split is performed at the level of unique items in the column, i.e. the given fraction of equivalence
265 classes will end up in the first set and the rest in the second set.
267 The list if unique items in the column can be shuffled before applying the split. If no shuffling is applied,
268 the original order in the data frame is maintained, and if the items were grouped by equivalence class in the
269 original data frame, the split will correspond to a fractional split without shuffling where the split boundary
270 is adjusted to not separate an equivalence class.
271 """
272 def __init__(self, column: str, shuffle=True, random_seed=42):
273 """
274 :param column: the column which defines the equivalence classes (groups of data points/rows that must not be separated)
275 :param shuffle: whether to shuffle the list of unique values in the given column before applying the split
276 :param random_seed:
277 """
278 self.column = column
279 self.shuffle = shuffle
280 self.random_seed = random_seed
282 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]:
283 values = list(df[self.column].unique())
284 if self.shuffle:
285 rng = random.Random(self.random_seed)
286 rng.shuffle(values)
288 num_items_in_first_set = round(fractional_size_of_first_set * len(values))
289 first_set_values = set(values[:num_items_in_first_set])
291 first_set_indices = []
292 second_set_indices = []
293 for i, t in enumerate(df.itertuples()):
294 if getattr(t, self.column) in first_set_values:
295 first_set_indices.append(i)
296 else:
297 second_set_indices.append(i)
298 return first_set_indices, second_set_indices
301class DataPointWeighting(ABC):
302 @abstractmethod
303 def compute_weights(self, x: pd.DataFrame, y: pd.DataFrame) -> pd.Series:
304 pass
307class DataPointWeightingRegressionTargetIntervalTotalWeight(DataPointWeighting):
308 """
309 Based on relative weights specified for intervals of the regression target,
310 will weight individual data point weights such that the sum of weights of data points within each interval
311 satisfies the user-specified relative weight, while ensuring that the total weight of all data points
312 is still equal to the number of data points.
314 For example, if one specifies `interval_weights` as [(0.5, 1), (inf, 2)], then the data points with target values
315 up to 0.5 will get 1/3 of the weight and the remaining data points will get 2/3 of the weight.
316 So if there are 100 data points and 50 of them are in the first interval (up to 0.5), then these 50 data points
317 will each get weight 1/3*100/50=2/3 and the remaining 50 data points will each get weight 2/3*100/50=4/3.
318 The sum of all weights is the number of data points, i.e. 100.
320 Example:
322 >>> targets = [0.1, 0.2, 0.5, 0.7, 0.8, 0.6]
323 >>> x = pd.DataFrame({"foo": np.zeros(len(targets))})
324 >>> y = pd.DataFrame({"target": targets})
325 >>> weighting = DataPointWeightingRegressionTargetIntervalTotalWeight([(0.5, 1), (1.0, 2)])
326 >>> weights = weighting.compute_weights(x, y)
327 >>> assert(np.isclose(weights.sum(), len(y)))
328 >>> weights.tolist()
329 [0.6666666666666666,
330 0.6666666666666666,
331 0.6666666666666666,
332 1.3333333333333333,
333 1.3333333333333333,
334 1.3333333333333333]
335 """
336 def __init__(self, intervals_weights: Sequence[Tuple[float, float]]):
337 """
338 :param intervals_weights: a sequence of tuples (upper_bound, rel_total_weight) where upper_bound is the upper bound
339 of the interval, `(lower_bound, upper_bound]`; `lower_bound` is the upper bound of the preceding interval
340 or -inf for the first interval. `rel_total_weight` specifies the relative weight of all data points within
341 the interval.
342 """
343 a = -math.inf
344 sum_rel_weights = sum(t[1] for t in intervals_weights)
345 self.intervals = []
346 for b, rel_weight in intervals_weights:
347 self.intervals.append(self.Interval(a, b, rel_weight / sum_rel_weights))
348 a = b
350 class Interval:
351 def __init__(self, a: float, b: float, weight_fraction: float):
352 self.a = a
353 self.b = b
354 self.weight_fraction = weight_fraction
356 def contains(self, x: float):
357 return self.a < x <= self.b
359 def compute_weights(self, x: pd.DataFrame, y: pd.DataFrame) -> pd.Series:
360 assert len(y.columns) == 1, f"Only a single regression target is supported {self.__class__.__name__}"
361 targets = y.iloc[:, 0]
362 n = len(x)
363 weights = np.zeros(n)
364 num_weighted = 0
365 for interval in self.intervals:
366 mask = np.array([interval.contains(x) for x in targets])
367 subset_size = mask.sum()
368 num_weighted += subset_size
369 weights[mask] = interval.weight_fraction * n / subset_size
370 if num_weighted != n:
371 raise Exception("Not all data points were weighted. Most likely, the intervals do not cover the entire range of targets")
372 return pd.Series(weights, index=x.index)