Coverage for src/sensai/data/io_data.py: 57%

173 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import logging 

2import random 

3from abc import ABC, abstractmethod 

4from typing import Tuple, Sequence, TypeVar, Generic 

5 

6import numpy as np 

7import pandas as pd 

8import scipy.stats 

9from sklearn.model_selection import StratifiedShuffleSplit 

10 

11from ..util.string import ToStringMixin 

12 

13log = logging.getLogger(__name__) 

14 

15T = TypeVar("T") 

16 

17 

18class BaseInputOutputData(Generic[T], ABC): 

19 def __init__(self, inputs: T, outputs: T): 

20 """ 

21 :param inputs: expected to have shape and __len__ 

22 :param outputs: expected to have shape and __len__ 

23 """ 

24 if len(inputs) != len(outputs): 

25 raise ValueError("Lengths do not match") 

26 self.inputs = inputs 

27 self.outputs = outputs 

28 

29 def __len__(self): 

30 return len(self.inputs) 

31 

32 @abstractmethod 

33 def filter_indices(self, indices: Sequence[int]) -> __qualname__: 

34 pass 

35 

36 

37class InputOutputArrays(BaseInputOutputData[np.ndarray]): 

38 def __init__(self, inputs: np.ndarray, outputs: np.ndarray): 

39 super().__init__(inputs, outputs) 

40 

41 def filter_indices(self, indices: Sequence[int]) -> __qualname__: 

42 inputs = self.inputs[indices] 

43 outputs = self.outputs[indices] 

44 return InputOutputArrays(inputs, outputs) 

45 

46 def to_torch_data_loader(self, batch_size=64, shuffle=True): 

47 try: 

48 import torch 

49 from torch.utils.data import DataLoader, TensorDataset 

50 except ImportError: 

51 raise ImportError(f"Could not import torch, did you install it?") 

52 dataset = TensorDataset(torch.tensor(self.inputs), torch.tensor(self.outputs)) 

53 return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle) 

54 

55 

56class InputOutputData(BaseInputOutputData[pd.DataFrame], ToStringMixin): 

57 """ 

58 Holds input and output data for learning problems 

59 """ 

60 def __init__(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

61 super().__init__(inputs, outputs) 

62 

63 def _tostring_object_info(self) -> str: 

64 return f"N={len(self.inputs)}, numInputColumns={len(self.inputs.columns)}, numOutputColumns={len(self.outputs.columns)}" 

65 

66 @classmethod 

67 def from_data_frame(cls, df: pd.DataFrame, *output_columns: str) -> "InputOutputData": 

68 """ 

69 :param df: a data frame containing both input and output columns 

70 :param output_columns: the output column name(s) 

71 :return: an InputOutputData instance with inputs and outputs separated 

72 """ 

73 inputs = df[[c for c in df.columns if c not in output_columns]] 

74 outputs = df[list(output_columns)] 

75 return cls(inputs, outputs) 

76 

77 def filter_indices(self, indices: Sequence[int]) -> __qualname__: 

78 inputs = self.inputs.iloc[indices] 

79 outputs = self.outputs.iloc[indices] 

80 return InputOutputData(inputs, outputs) 

81 

82 def filter_index(self, index_elements: Sequence[any]) -> __qualname__: 

83 inputs = self.inputs.loc[index_elements] 

84 outputs = self.outputs.loc[index_elements] 

85 return InputOutputData(inputs, outputs) 

86 

87 @property 

88 def input_dim(self): 

89 return self.inputs.shape[1] 

90 

91 @property 

92 def output_dim(self): 

93 return self.outputs.shape[1] 

94 

95 def compute_input_output_correlation(self): 

96 correlations = {} 

97 for outputCol in self.outputs.columns: 

98 correlations[outputCol] = {} 

99 output_series = self.outputs[outputCol] 

100 for inputCol in self.inputs.columns: 

101 input_series = self.inputs[inputCol] 

102 pcc, pvalue = scipy.stats.pearsonr(input_series, output_series) 

103 correlations[outputCol][inputCol] = pcc 

104 return correlations 

105 

106 def to_df(self) -> pd.DataFrame: 

107 return pd.concat((self.inputs, self.outputs), axis=1) 

108 

109 

110TInputOutputData = TypeVar("TInputOutputData", bound=BaseInputOutputData) 

111 

112 

113class DataSplitter(ABC, Generic[TInputOutputData]): 

114 @abstractmethod 

115 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]: 

116 pass 

117 

118 

119class DataSplitterFractional(DataSplitter): 

120 def __init__(self, fractional_size_of_first_set: float, shuffle=True, random_seed=42): 

121 if not 0 <= fractional_size_of_first_set <= 1: 

122 raise Exception(f"invalid fraction: {fractional_size_of_first_set}") 

123 self.fractionalSizeOfFirstSet = fractional_size_of_first_set 

124 self.shuffle = shuffle 

125 self.randomSeed = random_seed 

126 

127 def split_with_indices(self, data) -> Tuple[Tuple[Sequence[int], Sequence[int]], Tuple[TInputOutputData, TInputOutputData]]: 

128 num_data_points = len(data) 

129 split_index = int(num_data_points * self.fractionalSizeOfFirstSet) 

130 if self.shuffle: 

131 rand = np.random.RandomState(self.randomSeed) 

132 indices = rand.permutation(num_data_points) 

133 else: 

134 indices = range(num_data_points) 

135 indices_a = indices[:split_index] 

136 indices_b = indices[split_index:] 

137 a = data.filter_indices(list(indices_a)) 

138 b = data.filter_indices(list(indices_b)) 

139 return (indices_a, indices_b), (a, b) 

140 

141 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]: 

142 _, (a, b) = self.split_with_indices(data) 

143 return a, b 

144 

145 

146class DataSplitterFromDataFrameSplitter(DataSplitter[InputOutputData]): 

147 """ 

148 Creates a DataSplitter from a DataFrameSplitter, which can be applied either to the input or the output data. 

149 It supports only InputOutputData, not other subclasses of BaseInputOutputData. 

150 """ 

151 def __init__(self, data_frame_splitter: "DataFrameSplitter", fractional_size_of_first_set: float, apply_to_input=True): 

152 """ 

153 :param data_frame_splitter: the splitter to apply 

154 :param fractional_size_of_first_set: the desired fractional size of the first set when applying the splitter 

155 :param apply_to_input: if True, apply the splitter to the input data frame; if False, apply it to the output data frame 

156 """ 

157 self.dataFrameSplitter = data_frame_splitter 

158 self.fractionalSizeOfFirstSet = fractional_size_of_first_set 

159 self.applyToInput = apply_to_input 

160 

161 def split(self, data: InputOutputData) -> Tuple[InputOutputData, InputOutputData]: 

162 if not isinstance(data, InputOutputData): 

163 raise ValueError(f"{self} is only applicable to instances of {InputOutputData.__name__}, got {data}") 

164 df = data.inputs if self.applyToInput else data.outputs 

165 indices_a, indices_b = self.dataFrameSplitter.compute_split_indices(df, self.fractionalSizeOfFirstSet) 

166 a = data.filter_indices(list(indices_a)) 

167 b = data.filter_indices(list(indices_b)) 

168 return a, b 

169 

170 

171class DataSplitterFromSkLearnSplitter(DataSplitter): 

172 def __init__(self, sklearn_splitter): 

173 """ 

174 :param sklearn_splitter: an instance of one of the splitter classes from sklearn.model_selection, 

175 see https://scikit-learn.org/stable/modules/classes.html#module-sklearn.model_selection 

176 """ 

177 self.sklearn_splitter = sklearn_splitter 

178 

179 def split(self, data: TInputOutputData) -> Tuple[TInputOutputData, TInputOutputData]: 

180 splitter_result = self.sklearn_splitter.split(data.inputs, data.outputs) 

181 split = next(iter(splitter_result)) 

182 first_indices, second_indices = split 

183 return data.filter_indices(first_indices), data.filter_indices(second_indices) 

184 

185 

186class DataSplitterStratifiedShuffleSplit(DataSplitterFromSkLearnSplitter): 

187 def __init__(self, fractional_size_of_first_set: float, random_seed=42): 

188 super().__init__(StratifiedShuffleSplit(n_splits=1, train_size=fractional_size_of_first_set, random_state=random_seed)) 

189 

190 @staticmethod 

191 def is_applicable(io_data: InputOutputData): 

192 class_counts = io_data.outputs.value_counts() 

193 return all(class_counts >= 2) 

194 

195 

196class DataFrameSplitter(ABC): 

197 @abstractmethod 

198 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]: 

199 pass 

200 

201 @staticmethod 

202 def split_with_indices(df: pd.DataFrame, indices_pair: Tuple[Sequence[int], Sequence[int]]) -> Tuple[pd.DataFrame, pd.DataFrame]: 

203 indices_a, indices_b = indices_pair 

204 a = df.iloc[indices_a] 

205 b = df.iloc[indices_b] 

206 return a, b 

207 

208 def split(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[pd.DataFrame, pd.DataFrame]: 

209 return self.split_with_indices(df, self.compute_split_indices(df, fractional_size_of_first_set)) 

210 

211 

212class DataFrameSplitterFractional(DataFrameSplitter): 

213 def __init__(self, shuffle=False, random_seed=42): 

214 self.randomSeed = random_seed 

215 self.shuffle = shuffle 

216 

217 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]: 

218 n = df.shape[0] 

219 size_a = int(n * fractional_size_of_first_set) 

220 if self.shuffle: 

221 rand = np.random.RandomState(self.randomSeed) 

222 indices = rand.permutation(n) 

223 else: 

224 indices = list(range(n)) 

225 indices_a = indices[:size_a] 

226 indices_b = indices[size_a:] 

227 return indices_a, indices_b 

228 

229 

230class DataFrameSplitterColumnEquivalenceClass(DataFrameSplitter): 

231 """ 

232 Performs a split that keeps together data points/rows that have the same value in a given column, i.e. 

233 with respect to that column, the items having the same values are viewed as a unit; they form an equivalence class, and all 

234 data points belonging to the same class are either in the first set or the second set. 

235 

236 The split is performed at the level of unique items in the column, i.e. the given fraction of equivalence 

237 classes will end up in the first set and the rest in the second set. 

238 

239 The list if unique items in the column can be shuffled before applying the split. If no shuffling is applied, 

240 the original order in the data frame is maintained, and if the items were grouped by equivalence class in the 

241 original data frame, the split will correspond to a fractional split without shuffling where the split boundary 

242 is adjusted to not separate an equivalence class. 

243 """ 

244 def __init__(self, column: str, shuffle=True, random_seed=42): 

245 """ 

246 :param column: the column which defines the equivalence classes (groups of data points/rows that must not be separated) 

247 :param shuffle: whether to shuffle the list of unique values in the given column before applying the split 

248 :param random_seed: 

249 """ 

250 self.column = column 

251 self.shuffle = shuffle 

252 self.random_seed = random_seed 

253 

254 def compute_split_indices(self, df: pd.DataFrame, fractional_size_of_first_set: float) -> Tuple[Sequence[int], Sequence[int]]: 

255 values = list(df[self.column].unique()) 

256 if self.shuffle: 

257 rng = random.Random(self.random_seed) 

258 rng.shuffle(values) 

259 

260 num_items_in_first_set = round(fractional_size_of_first_set * len(values)) 

261 first_set_values = set(values[:num_items_in_first_set]) 

262 

263 first_set_indices = [] 

264 second_set_indices = [] 

265 for i, t in enumerate(df.itertuples()): 

266 if getattr(t, self.column) in first_set_values: 

267 first_set_indices.append(i) 

268 else: 

269 second_set_indices.append(i) 

270 return first_set_indices, second_set_indices