Coverage for src/sensai/sklearn/sklearn_base.py: 72%

218 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import copy 

2import logging 

3import re 

4from abc import ABC, abstractmethod 

5from typing import List, Any, Dict, Optional 

6 

7import numpy as np 

8import pandas as pd 

9from sklearn import compose 

10 

11from ..feature_importance import FeatureImportanceProvider 

12from ..util.pickle import setstate 

13from ..util.string import dict_string 

14from ..vector_model import VectorRegressionModel, VectorClassificationModel 

15 

16log = logging.getLogger(__name__) 

17 

18 

19def create_sklearn_model(model_constructor, model_args, output_transformer=None): 

20 model = model_constructor(**model_args) 

21 if output_transformer is not None: 

22 model = compose.TransformedTargetRegressor(regressor=model, transformer=output_transformer) 

23 return model 

24 

25 

26def str_sklearn_model(model): 

27 """ 

28 Creates a cleaned string representation of the model with line breaks and indentations removed 

29 

30 :param model: the sklearn model for which to generate the cleaned string representation 

31 :return: the string representation 

32 """ 

33 return re.sub(r",\s*", ", ", str(model)) 

34 

35 

36def _apply_sklearn_input_transformer(inputs: pd.DataFrame, sklearn_input_transformer: Optional, fit: bool) -> pd.DataFrame: 

37 if sklearn_input_transformer is None: 

38 return inputs 

39 else: 

40 input_values = inputs.values 

41 shape_before = input_values.shape 

42 if fit: 

43 input_values = sklearn_input_transformer.fit_transform(input_values) 

44 else: 

45 input_values = sklearn_input_transformer.transform(input_values) 

46 if input_values.shape != shape_before: 

47 raise Exception("sklearnInputTransformer changed the shape of the input, which is unsupported. " 

48 "Consider using an a DFTSkLearnTransformer as a feature transformer instead.") 

49 return pd.DataFrame(input_values, index=inputs.index, columns=inputs.columns) 

50 

51 

52class AbstractSkLearnVectorRegressionModel(VectorRegressionModel, ABC): 

53 """ 

54 Base class for models built upon scikit-learn's model implementations 

55 """ 

56 log = log.getChild(__qualname__) 

57 

58 def __init__(self, model_constructor, **model_args): 

59 """ 

60 :param model_constructor: the sklearn model constructor 

61 :param model_args: arguments to be passed to the sklearn model constructor 

62 """ 

63 super().__init__() 

64 self.sklearnInputTransformer = None 

65 self.sklearnOutputTransformer = None 

66 self.modelConstructor = model_constructor 

67 self.modelArgs = model_args 

68 self.fitArgs = {} 

69 

70 def _tostring_excludes(self) -> List[str]: 

71 return super()._tostring_excludes() + ["sklearnInputTransformer", "sklearnOutputTransformer", "modelConstructor", "modelArgs"] 

72 

73 def with_sklearn_input_transformer(self, sklearn_input_transformer) -> __qualname__: 

74 """ 

75 :param sklearn_input_transformer: an optional sklearn preprocessor for normalising/scaling inputs 

76 :return: self 

77 """ 

78 self.sklearnInputTransformer = sklearn_input_transformer 

79 return self 

80 

81 def with_sklearn_output_transformer(self, sklearn_output_transformer): 

82 """ 

83 :param sklearn_output_transformer: an optional sklearn preprocessor for normalising/scaling outputs 

84 :return: self 

85 """ 

86 self.sklearnOutputTransformer = sklearn_output_transformer 

87 return self 

88 

89 def _transform_input(self, inputs: pd.DataFrame, fit=False) -> pd.DataFrame: 

90 return _apply_sklearn_input_transformer(inputs, self.sklearnInputTransformer, fit) 

91 

92 def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

93 """ 

94 Designed to be overridden in order to make input data-specific changes to modelArgs 

95 

96 :param inputs: the training input data 

97 :param outputs: the training output data 

98 """ 

99 pass 

100 

101 def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

102 """ 

103 Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the 

104 underlying model's fit method) 

105 

106 :param inputs: the training input data 

107 :param outputs: the training output data 

108 """ 

109 pass 

110 

111 def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

112 inputs = self._transform_input(inputs, fit=True) 

113 self._update_model_args(inputs, outputs) 

114 self._update_fit_args(inputs, outputs) 

115 self._fit_sklearn(inputs, outputs) 

116 

117 @abstractmethod 

118 def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

119 pass 

120 

121 def _predict(self, x: pd.DataFrame): 

122 inputs = self._transform_input(x) 

123 return self._predict_sklearn(inputs) 

124 

125 @abstractmethod 

126 def _predict_sklearn(self, inputs: pd.DataFrame): 

127 pass 

128 

129 

130class AbstractSkLearnMultipleOneDimVectorRegressionModel(AbstractSkLearnVectorRegressionModel, ABC): 

131 """ 

132 Base class for models which use several sklearn models of the same type with a single 

133 output dimension to create a multi-dimensional model (for the case where there is more than one output dimension) 

134 """ 

135 def __init__(self, model_constructor, **model_args): 

136 super().__init__(model_constructor, **model_args) 

137 self.models = {} 

138 

139 def _tostring_excludes(self) -> List[str]: 

140 return super()._tostring_excludes() + ["models"] 

141 

142 def _tostring_additional_entries(self) -> Dict[str, Any]: 

143 d = super()._tostring_additional_entries() 

144 if len(self.models) > 0: 

145 d["model[0]"] = str_sklearn_model(next(iter(self.models.values()))) 

146 else: 

147 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})" 

148 return d 

149 

150 def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

151 for predictedVarName in outputs.columns: 

152 log.info(f"Fitting model for output variable '{predictedVarName}'") 

153 model = create_sklearn_model(self.modelConstructor, 

154 self.modelArgs, 

155 output_transformer=copy.deepcopy(self.sklearnOutputTransformer)) 

156 model.fit(inputs, outputs[predictedVarName], **self.fitArgs) 

157 self.models[predictedVarName] = model 

158 

159 def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame: 

160 results = {} 

161 for varName in self.models: 

162 results[varName] = self._predict_sklearn_single_model(self.models[varName], inputs) 

163 return pd.DataFrame(results) 

164 

165 def _predict_sklearn_single_model(self, model, inputs: pd.DataFrame) -> np.ndarray: 

166 return model.predict(inputs) 

167 

168 def get_sklearn_model(self, predicted_var_name=None): 

169 if predicted_var_name is None: 

170 if len(self.models) > 1: 

171 raise ValueError(f"Must provide predicted variable name (one of {self.models.keys()})") 

172 return next(iter(self.models.values())) 

173 return self.models[predicted_var_name] 

174 

175 

176class AbstractSkLearnMultiDimVectorRegressionModel(AbstractSkLearnVectorRegressionModel, ABC): 

177 """ 

178 Base class for models which use a single sklearn model with multiple output dimensions to create the multi-dimensional model 

179 """ 

180 def __init__(self, model_constructor, **model_args): 

181 super().__init__(model_constructor, **model_args) 

182 self.model = None 

183 

184 def _tostring_excludes(self) -> List[str]: 

185 return super()._tostring_excludes() + ["model"] 

186 

187 def _tostring_additional_entries(self) -> Dict[str, Any]: 

188 d = super()._tostring_additional_entries() 

189 if self.model is not None: 

190 d["model"] = str_sklearn_model(self.model) 

191 else: 

192 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})" 

193 return d 

194 

195 def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

196 if len(outputs.columns) > 1: 

197 log.info(f"Fitting a single multi-dimensional model for all {len(outputs.columns)} output dimensions") 

198 self.model = create_sklearn_model(self.modelConstructor, self.modelArgs, output_transformer=self.sklearnOutputTransformer) 

199 output_values = outputs.values 

200 if output_values.shape[1] == 1: # for 1D output, shape must be (numSamples,) rather than (numSamples, 1) 

201 output_values = np.ravel(output_values) 

202 self.model.fit(inputs, output_values, **self.fitArgs) 

203 

204 def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame: 

205 y = self.model.predict(inputs) 

206 return pd.DataFrame(y, columns=self.get_model_output_variable_names()) 

207 

208 

209class AbstractSkLearnVectorClassificationModel(VectorClassificationModel, ABC): 

210 def __init__(self, model_constructor, use_balanced_class_weights=False, use_label_encoding=False, **model_args): 

211 """ 

212 :param model_constructor: the sklearn model constructor 

213 :param model_args: arguments to be passed to the sklearn model constructor 

214 :param use_balanced_class_weights: whether to compute class weights from the training data and apply the corresponding weight to 

215 each data point such that the sum of weights for all classes is equal. This is achieved by applying a weight proportional 

216 to the reciprocal frequency of the class in the (training) data. We scale weights such that the smallest weight (of the 

217 largest class) is 1, ensuring that weight counts still reasonably correspond to data point counts. 

218 Note that weighted data points may not be supported for all types of models. 

219 :param use_label_encoding: whether to replace original class labels with 0-based index in sorted list of labels (a.k.a. label 

220 encoding), which is required by some sklearn-compatible implementations (particularly xgboost) 

221 """ 

222 super().__init__() 

223 self.modelConstructor = model_constructor 

224 self.sklearnInputTransformer = None 

225 self.modelArgs = model_args 

226 self.fitArgs = {} 

227 self.useBalancedClassWeights = use_balanced_class_weights 

228 self.useLabelEncoding = use_label_encoding 

229 self.model = None 

230 

231 def __setstate__(self, state): 

232 setstate(AbstractSkLearnVectorClassificationModel, self, state, new_optional_properties=["labelEncoder"], 

233 new_default_properties={"useComputedClassWeights": False, "useLabelEncoder": False}, 

234 renamed_properties={"useComputedClassWeights": "useBalancedClassWeights"}, 

235 removed_properties=["sklearnOutputTransformer"]) 

236 

237 def _tostring_excludes(self) -> List[str]: 

238 return super()._tostring_excludes() + ["modelConstructor", "sklearnInputTransformer", "sklearnOutputTransformer", 

239 "modelArgs", "model"] 

240 

241 def _tostring_additional_entries(self) -> Dict[str, Any]: 

242 d = super()._tostring_additional_entries() 

243 if self.model is None: 

244 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})" 

245 else: 

246 d["model"] = str_sklearn_model(self.model) 

247 return d 

248 

249 def with_sklearn_input_transformer(self, sklearn_input_transformer) -> __qualname__: 

250 """ 

251 :param sklearn_input_transformer: an optional sklearn preprocessor for transforming inputs 

252 :return: self 

253 """ 

254 self.sklearnInputTransformer = sklearn_input_transformer 

255 return self 

256 

257 def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

258 """ 

259 Designed to be overridden in order to make input data-specific changes to modelArgs 

260 

261 :param inputs: the training input data 

262 :param outputs: the training output data 

263 """ 

264 pass 

265 

266 def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

267 """ 

268 Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the 

269 underlying model's fit method) 

270 

271 :param inputs: the training input data 

272 :param outputs: the training output data 

273 """ 

274 pass 

275 

276 def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

277 inputs = self._transform_input(inputs, fit=True) 

278 self._update_model_args(inputs, outputs) 

279 self._update_fit_args(inputs, outputs) 

280 self.model = create_sklearn_model(self.modelConstructor, self.modelArgs) 

281 log.info(f"Fitting sklearn classifier of type {self.model.__class__.__name__}") 

282 kwargs = dict(self.fitArgs) 

283 if self.useBalancedClassWeights: 

284 class2weight = self._compute_class_weights(outputs) 

285 classes = outputs.iloc[:, 0] 

286 weights = np.array([class2weight[cls] for cls in classes]) 

287 weights = weights / np.min(weights) 

288 kwargs["sample_weight"] = weights 

289 

290 output_values = np.ravel(outputs.values) 

291 if self.useLabelEncoding: 

292 output_values = self._encode_labels(output_values) 

293 self._fit_sklearn_classifier(inputs, output_values, kwargs) 

294 

295 def _fit_sklearn_classifier(self, inputs: pd.DataFrame, output_values: np.ndarray, kwargs: Dict[str, Any]): 

296 self.model.fit(inputs, output_values, **kwargs) 

297 

298 def _transform_input(self, inputs: pd.DataFrame, fit=False) -> pd.DataFrame: 

299 return _apply_sklearn_input_transformer(inputs, self.sklearnInputTransformer, fit) 

300 

301 def _encode_labels(self, y: np.ndarray): 

302 d = {l: i for i, l in enumerate(self._labels)} 

303 vfn = np.vectorize(lambda x: d[x]) 

304 return vfn(y) 

305 

306 def _decode_labels(self, y: np.ndarray): 

307 d = dict(enumerate(self._labels)) 

308 vfn = np.vectorize(lambda x: d[x]) 

309 return vfn(y) 

310 

311 def _predict_sklearn(self, input_values): 

312 return self.model.predict(input_values) 

313 

314 def _predict(self, x: pd.DataFrame): 

315 input_values = self._transform_input(x) 

316 y = self._predict_sklearn(input_values) 

317 if self.useLabelEncoding: 

318 y = self._decode_labels(y) 

319 return pd.DataFrame(y, columns=self._predictedVariableNames) 

320 

321 def _predict_class_probabilities(self, x: pd.DataFrame): 

322 input_values = self._transform_input(x) 

323 y = self.model.predict_proba(input_values) 

324 return pd.DataFrame(y, columns=self._labels) 

325 

326 def get_params(self, deep=True): 

327 return self.model.get_params(deep=deep) 

328 

329 def set_params(self, **params): 

330 self.model.set_params(**params) 

331 

332 # noinspection PyMethodMayBeStatic 

333 def _compute_class_weights(self, outputs: pd.DataFrame): 

334 """ 

335 :param outputs: the output data frame containing the class labels as the first column 

336 :return: the dictionary of class weights mapping class to weight value 

337 """ 

338 classes: pd.Series = outputs.iloc[:, 0] 

339 counts = classes.value_counts() 

340 rfreqs = counts / counts.sum() 

341 weights: pd.Series = 1.0 / rfreqs 

342 return weights.to_dict() 

343 

344 

345def _get_model_feature_importance_vector(model): 

346 cand_attributes = ("feature_importances_", "coef_") 

347 for attr in cand_attributes: 

348 if hasattr(model, attr): 

349 importance_values = getattr(model, attr) 

350 if attr == "coef_": 

351 importance_values = np.abs(importance_values) # for coefficients in linear models, use the absolute values 

352 return importance_values 

353 raise ValueError(f"Model {model} has none of the attributes {cand_attributes}") 

354 

355 

356class FeatureImportanceProviderSkLearnRegressionMultipleOneDim(FeatureImportanceProvider): 

357 def get_feature_importance_dict(self) -> Dict[str, Dict[str, int]]: 

358 self: AbstractSkLearnMultipleOneDimVectorRegressionModel 

359 return {targetFeature: dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(model))) 

360 for targetFeature, model in self.models.items()} 

361 

362 

363class FeatureImportanceProviderSkLearnRegressionMultiDim(FeatureImportanceProvider): 

364 def get_feature_importance_dict(self) -> Dict[str, float]: 

365 self: AbstractSkLearnMultiDimVectorRegressionModel 

366 return dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(self.model))) 

367 

368 

369class FeatureImportanceProviderSkLearnClassification(FeatureImportanceProvider): 

370 def get_feature_importance_dict(self) -> Dict[str, float]: 

371 self: AbstractSkLearnVectorClassificationModel 

372 return dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(self.model)))