Coverage for src/sensai/data/io_data.py: 52%

226 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-29 18:29 +0000

1import logging 

2import math 

3import random 

4from abc import ABC, abstractmethod 

5from typing import Tuple, Sequence, TypeVar, Generic, Optional, Union 

6 

7import numpy as np 

8import pandas as pd 

9import scipy.stats 

10from sklearn.model_selection import StratifiedShuffleSplit 

11 

12from ..util.pickle import setstate 

13from ..util.string import ToStringMixin 

14 

15log = logging.getLogger(__name__) 

16 

17T = TypeVar("T") 

18 

19 

20class BaseInputOutputData(Generic[T], ABC): 

21 def __init__(self, inputs: T, outputs: T): 

22 """ 

23 :param inputs: expected to have shape and __len__ 

24 :param outputs: expected to have shape and __len__ 

25 """ 

26 if len(inputs) != len(outputs): 

27 raise ValueError("Lengths do not match") 

28 self.inputs = inputs 

29 self.outputs = outputs 

30 

31 def __len__(self): 

32 return len(self.inputs) 

33 

34 @abstractmethod 

35 def filter_indices(self, indices: Sequence[int]) -> __qualname__: 

36 pass 

37 

38 

39class InputOutputArrays(BaseInputOutputData[np.ndarray]): 

40 def __init__(self, inputs: np.ndarray, outputs: np.ndarray): 

41 super().__init__(inputs, outputs) 

42 

43 def filter_indices(self, indices: Sequence[int]) -> __qualname__: 

44 inputs = self.inputs[indices] 

45 outputs = self.outputs[indices] 

46 return InputOutputArrays(inputs, outputs) 

47 

48 def to_torch_data_loader(self, batch_size=64, shuffle=True): 

49 try: 

50 import torch 

51 from torch.utils.data import DataLoader, TensorDataset 

52 except ImportError: 

53 raise ImportError(f"Could not import torch, did you install it?") 

54 dataset = TensorDataset(torch.tensor(self.inputs), torch.tensor(self.outputs)) 

55 return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle) 

56 

57 

58class InputOutputData(BaseInputOutputData[pd.DataFrame], ToStringMixin): 

59 """ 

60 Holds input and output data for learning problems 

61 """ 

62 def __init__(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[Union[pd.Series, "DataPointWeighting"]] = None): 

63 super().__init__(inputs, outputs) 

64 if isinstance(weights, DataPointWeighting): 

65 weights = weights.compute_weights(inputs, outputs) 

66 self.weights = weights 

67 

68 def __setstate__(self, state): 

69 setstate(InputOutputData, self, state, new_optional_properties=["weights"]) 

70 

71 def _tostring_object_info(self) -> str: 

72 return f"N={len(self.inputs)}, numInputColumns={len(self.inputs.columns)}, numOutputColumns={len(self.outputs.columns)}" 

73 

74 @classmethod 

75 def from_data_frame(cls, df: pd.DataFrame, *output_columns: str) -> "InputOutputData": 

76 """ 

77 :param df: a data frame containing both input and output columns 

78 :param output_columns: the output column name(s) 

79 :return: an InputOutputData instance with inputs and outputs separated 

80 """ 

81 inputs = df[[c for c in df.columns if c not in output_columns]] 

82 outputs = df[list(output_columns)] 

83 return cls(inputs, outputs) 

84 

85 def to_data_frame(self, add_weights: bool = False, weights_col_name: str = "weights") -> pd.DataFrame: 

86 """ 

87 :param add_weights: whether to add the weights as a column (provided that weights are present) 

88 :param weights_col_name: the column name to use for weights if `add_weights` is True 

89 :return: a data frame containing both the inputs and outputs (and optionally the weights) 

90 """ 

91 df = pd.concat([self.inputs, self.outputs], axis=1) 

92 if add_weights and self.weights is not None: 

93 df[weights_col_name] = self.weights 

94 return df 

95 

96 def to_df(self, add_weights: bool = False, weights_col_name: str = "weights") -> pd.DataFrame: 

97 return self.to_data_frame(add_weights=add_weights, weights_col_name=weights_col_name) 

98 

99 def filter_indices(self, indices: Sequence[int]) -> __qualname__: 

100 inputs = self.inputs.iloc[indices] 

101 outputs = self.outputs.iloc[indices] 

102 weights = None 

103 if self.weights is not None: 

104 weights = self.weights.iloc[indices] 

105 return InputOutputData(inputs, outputs, weights) 

106 

107 def filter_index(self, index_elements: Sequence[any]) -> __qualname__: 

108 inputs = self.inputs.loc[index_elements] 

109 outputs = self.outputs.loc[index_elements] 

110 weights = None 

111 if self.weights is not None: 

112 weights = self.weights 

113 return InputOutputData(inputs, outputs, weights) 

114 

115 @property 

116 def input_dim(self): 

117 return self.inputs.shape[1] 

118 

119 @property 

120 def output_dim(self): 

121 return self.outputs.shape[1] 

122 

123 def compute_input_output_correlation(self): 

124 correlations = {} 

125 for outputCol in self.outputs.columns: 

126 correlations[outputCol] = {} 

127 output_series = self.outputs[outputCol] 

128 for inputCol in self.inputs.columns: 

129 input_series = self.inputs[inputCol] 

130 pcc, pvalue = scipy.stats.pearsonr(input_series, output_series) 

131 correlations[outputCol][inputCol] = pcc 

132 return correlations 

133 

134 def apply_weighting(self, weighting: "DataPointWeighting"): 

135 self.weights = weighting.compute_weights(self.inputs, self.outputs) 

136 

137 

138TInputOutputData = TypeVar("TInputOutputData", bound=BaseInputOutputData) 

139 

140 

141class DataSplitter(ABC, Generic[TInputOutputData]): 

142 @abstractmethod 

143 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]: 

144 pass 

145 

146 

147class DataSplitterFractional(DataSplitter): 

148 def __init__(self, fractional_size_of_first_set: float, shuffle=True, random_seed=42): 

149 if not 0 <= fractional_size_of_first_set <= 1: 

150 raise Exception(f"invalid fraction: {fractional_size_of_first_set}") 

151 self.fractionalSizeOfFirstSet = fractional_size_of_first_set 

152 self.shuffle = shuffle 

153 self.randomSeed = random_seed 

154 

155 def split_with_indices(self, data) -> Tuple[Tuple[Sequence[int], Sequence[int]], Tuple[TInputOutputData, TInputOutputData]]: 

156 num_data_points = len(data) 

157 split_index = int(num_data_points * self.fractionalSizeOfFirstSet) 

158 if self.shuffle: 

159 rand = np.random.RandomState(self.randomSeed) 

160 indices = rand.permutation(num_data_points) 

161 else: 

162 indices = range(num_data_points) 

163 indices_a = indices[:split_index] 

164 indices_b = indices[split_index:] 

165 a = data.filter_indices(list(indices_a)) 

166 b = data.filter_indices(list(indices_b)) 

167 return (indices_a, indices_b), (a, b) 

168 

169 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]: 

170 _, (a, b) = self.split_with_indices(data) 

171 return a, b 

172 

173 

174class DataSplitterFromDataFrameSplitter(DataSplitter[InputOutputData]): 

175 """ 

176 Creates a DataSplitter from a DataFrameSplitter, which can be applied either to the input or the output data. 

177 It supports only InputOutputData, not other subclasses of BaseInputOutputData. 

178 """ 

179 def __init__(self, data_frame_splitter: "DataFrameSplitter", fractional_size_of_first_set: float, apply_to_input=True): 

180 """ 

181 :param data_frame_splitter: the splitter to apply 

182 :param fractional_size_of_first_set: the desired fractional size of the first set when applying the splitter 

183 :param apply_to_input: if True, apply the splitter to the input data frame; if False, apply it to the output data frame 

184 """ 

185 self.dataFrameSplitter = data_frame_splitter 

186 self.fractionalSizeOfFirstSet = fractional_size_of_first_set 

187 self.applyToInput = apply_to_input 

188 

189 def split(self, data: InputOutputData) -> Tuple[InputOutputData, InputOutputData]: 

190 if not isinstance(data, InputOutputData): 

191 raise ValueError(f"{self} is only applicable to instances of {InputOutputData.__name__}, got {data}") 

192 df = data.inputs if self.applyToInput else data.outputs 

193 indices_a, indices_b = self.dataFrameSplitter.compute_split_indices(df, self.fractionalSizeOfFirstSet) 

194 a = data.filter_indices(list(indices_a)) 

195 b = data.filter_indices(list(indices_b)) 

196 return a, b 

197 

198 

199class DataSplitterFromSkLearnSplitter(DataSplitter): 

200 def __init__(self, sklearn_splitter): 

201 """ 

202 :param sklearn_splitter: an instance of one of the splitter classes from sklearn.model_selection, 

203 see https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection 

204 """ 

205 self.sklearn_splitter = sklearn_splitter 

206 

207 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]: 

208 splitter_result = self.sklearn_splitter.split(data.inputs, data.outputs) 

209 split = next(iter(splitter_result)) 

210 first_indices, second_indices = split 

211 return data.filter_indices(first_indices), data.filter_indices(second_indices) 

212 

213 

214class DataSplitterStratifiedShuffleSplit(DataSplitterFromSkLearnSplitter): 

215 def __init__(self, fractional_size_of_first_set: float, random_seed=42): 

216 super().__init__(StratifiedShuffleSplit(n_splits=1, train_size=fractional_size_of_first_set, random_state=random_seed)) 

217 

218 @staticmethod 

219 def is_applicable(io_data: InputOutputData): 

220 class_counts = io_data.outputs.value_counts() 

221 return all(class_counts >= 2) 

222 

223 

224class DataFrameSplitter(ABC): 

225 @abstractmethod 

226 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]: 

227 pass 

228 

229 @staticmethod 

230 def split_with_indices(df: pd.DataFrame, indices_pair: Tuple[Sequence[int], Sequence[int]]) -> Tuple[pd.DataFrame, pd.DataFrame]: 

231 indices_a, indices_b = indices_pair 

232 a = df.iloc[indices_a] 

233 b = df.iloc[indices_b] 

234 return a, b 

235 

236 def split(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[pd.DataFrame, pd.DataFrame]: 

237 return self.split_with_indices(df, self.compute_split_indices(df, fractional_size_of_first_set)) 

238 

239 

240class DataFrameSplitterFractional(DataFrameSplitter): 

241 def __init__(self, shuffle=False, random_seed=42): 

242 self.randomSeed = random_seed 

243 self.shuffle = shuffle 

244 

245 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]: 

246 n = df.shape[0] 

247 size_a = int(n * fractional_size_of_first_set) 

248 if self.shuffle: 

249 rand = np.random.RandomState(self.randomSeed) 

250 indices = rand.permutation(n) 

251 else: 

252 indices = list(range(n)) 

253 indices_a = indices[:size_a] 

254 indices_b = indices[size_a:] 

255 return indices_a, indices_b 

256 

257 

258class DataFrameSplitterColumnEquivalenceClass(DataFrameSplitter): 

259 """ 

260 Performs a split that keeps together data points/rows that have the same value in a given column, i.e. 

261 with respect to that column, the items having the same values are viewed as a unit; they form an equivalence class, and all 

262 data points belonging to the same class are either in the first set or the second set. 

263 

264 The split is performed at the level of unique items in the column, i.e. the given fraction of equivalence 

265 classes will end up in the first set and the rest in the second set. 

266 

267 The list if unique items in the column can be shuffled before applying the split. If no shuffling is applied, 

268 the original order in the data frame is maintained, and if the items were grouped by equivalence class in the 

269 original data frame, the split will correspond to a fractional split without shuffling where the split boundary 

270 is adjusted to not separate an equivalence class. 

271 """ 

272 def __init__(self, column: str, shuffle=True, random_seed=42): 

273 """ 

274 :param column: the column which defines the equivalence classes (groups of data points/rows that must not be separated) 

275 :param shuffle: whether to shuffle the list of unique values in the given column before applying the split 

276 :param random_seed: 

277 """ 

278 self.column = column 

279 self.shuffle = shuffle 

280 self.random_seed = random_seed 

281 

282 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]: 

283 values = list(df[self.column].unique()) 

284 if self.shuffle: 

285 rng = random.Random(self.random_seed) 

286 rng.shuffle(values) 

287 

288 num_items_in_first_set = round(fractional_size_of_first_set * len(values)) 

289 first_set_values = set(values[:num_items_in_first_set]) 

290 

291 first_set_indices = [] 

292 second_set_indices = [] 

293 for i, t in enumerate(df.itertuples()): 

294 if getattr(t, self.column) in first_set_values: 

295 first_set_indices.append(i) 

296 else: 

297 second_set_indices.append(i) 

298 return first_set_indices, second_set_indices 

299 

300 

301class DataPointWeighting(ABC): 

302 @abstractmethod 

303 def compute_weights(self, x: pd.DataFrame, y: pd.DataFrame) -> pd.Series: 

304 pass 

305 

306 

307class DataPointWeightingRegressionTargetIntervalTotalWeight(DataPointWeighting): 

308 """ 

309 Based on relative weights specified for intervals of the regression target, 

310 will weight individual data point weights such that the sum of weights of data points within each interval 

311 satisfies the user-specified relative weight, while ensuring that the total weight of all data points 

312 is still equal to the number of data points. 

313 

314 For example, if one specifies `interval_weights` as [(0.5, 1), (inf, 2)], then the data points with target values 

315 up to 0.5 will get 1/3 of the weight and the remaining data points will get 2/3 of the weight. 

316 So if there are 100 data points and 50 of them are in the first interval (up to 0.5), then these 50 data points 

317 will each get weight 1/3*100/50=2/3 and the remaining 50 data points will each get weight 2/3*100/50=4/3. 

318 The sum of all weights is the number of data points, i.e. 100. 

319 

320 Example: 

321 

322 >>> targets = [0.1, 0.2, 0.5, 0.7, 0.8, 0.6] 

323 >>> x = pd.DataFrame({"foo": np.zeros(len(targets))}) 

324 >>> y = pd.DataFrame({"target": targets}) 

325 >>> weighting = DataPointWeightingRegressionTargetIntervalTotalWeight([(0.5, 1), (1.0, 2)]) 

326 >>> weights = weighting.compute_weights(x, y) 

327 >>> assert(np.isclose(weights.sum(), len(y))) 

328 >>> weights.tolist() 

329 [0.6666666666666666, 

330 0.6666666666666666, 

331 0.6666666666666666, 

332 1.3333333333333333, 

333 1.3333333333333333, 

334 1.3333333333333333] 

335 """ 

336 def __init__(self, intervals_weights: Sequence[Tuple[float, float]]): 

337 """ 

338 :param intervals_weights: a sequence of tuples (upper_bound, rel_total_weight) where upper_bound is the upper bound 

339 of the interval, `(lower_bound, upper_bound]`; `lower_bound` is the upper bound of the preceding interval 

340 or -inf for the first interval. `rel_total_weight` specifies the relative weight of all data points within 

341 the interval. 

342 """ 

343 a = -math.inf 

344 sum_rel_weights = sum(t[1] for t in intervals_weights) 

345 self.intervals = [] 

346 for b, rel_weight in intervals_weights: 

347 self.intervals.append(self.Interval(a, b, rel_weight / sum_rel_weights)) 

348 a = b 

349 

350 class Interval: 

351 def __init__(self, a: float, b: float, weight_fraction: float): 

352 self.a = a 

353 self.b = b 

354 self.weight_fraction = weight_fraction 

355 

356 def contains(self, x: float): 

357 return self.a < x <= self.b 

358 

359 def compute_weights(self, x: pd.DataFrame, y: pd.DataFrame) -> pd.Series: 

360 assert len(y.columns) == 1, f"Only a single regression target is supported {self.__class__.__name__}" 

361 targets = y.iloc[:, 0] 

362 n = len(x) 

363 weights = np.zeros(n) 

364 num_weighted = 0 

365 for interval in self.intervals: 

366 mask = np.array([interval.contains(x) for x in targets]) 

367 subset_size = mask.sum() 

368 num_weighted += subset_size 

369 weights[mask] = interval.weight_fraction * n / subset_size 

370 if num_weighted != n: 

371 raise Exception("Not all data points were weighted. Most likely, the intervals do not cover the entire range of targets") 

372 return pd.Series(weights, index=x.index)