Coverage for src/sensai/sklearn/sklearn_base.py: 71%

244 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-29 18:29 +0000

1import copy 

2import logging 

3import re 

4from abc import ABC, abstractmethod 

5from dataclasses import dataclass 

6from typing import List, Any, Dict, Optional 

7 

8import numpy as np 

9import pandas as pd 

10from sklearn import compose 

11 

12from ..feature_importance import FeatureImportanceProvider 

13from ..util.pickle import setstate 

14from ..util.string import dict_string 

15from ..vector_model import VectorRegressionModel, VectorClassificationModel 

16 

17log = logging.getLogger(__name__) 

18 

19 

20def create_sklearn_model(model_constructor, model_args, output_transformer=None): 

21 model = model_constructor(**model_args) 

22 if output_transformer is not None: 

23 model = compose.TransformedTargetRegressor(regressor=model, transformer=output_transformer) 

24 return model 

25 

26 

27def str_sklearn_model(model): 

28 """ 

29 Creates a cleaned string representation of the model with line breaks and indentations removed 

30 

31 :param model: the sklearn model for which to generate the cleaned string representation 

32 :return: the string representation 

33 """ 

34 return re.sub(r",\s*", ", ", str(model)) 

35 

36 

37def _apply_sklearn_input_transformer(inputs: pd.DataFrame, sklearn_input_transformer: Optional, fit: bool) -> pd.DataFrame: 

38 if sklearn_input_transformer is None: 

39 return inputs 

40 else: 

41 input_values = inputs.values 

42 shape_before = input_values.shape 

43 if fit: 

44 input_values = sklearn_input_transformer.fit_transform(input_values) 

45 else: 

46 input_values = sklearn_input_transformer.transform(input_values) 

47 if input_values.shape != shape_before: 

48 raise Exception("sklearnInputTransformer changed the shape of the input, which is unsupported. " 

49 "Consider using an a DFTSkLearnTransformer as a feature transformer instead.") 

50 return pd.DataFrame(input_values, index=inputs.index, columns=inputs.columns) 

51 

52 

53class ActualFitParams: 

54 def __init__(self, inputs, outputs, kwargs: Dict[str, Any]): 

55 self.inputs = inputs 

56 self.outputs = outputs 

57 self.kwargs = kwargs 

58 

59 

60class AbstractSkLearnVectorRegressionModel(VectorRegressionModel, ABC): 

61 """ 

62 Base class for models built upon scikit-learn's model implementations 

63 """ 

64 log = log.getChild(__qualname__) 

65 

66 def __init__(self, model_constructor, **model_args): 

67 """ 

68 :param model_constructor: the sklearn model constructor 

69 :param model_args: arguments to be passed to the sklearn model constructor 

70 """ 

71 super().__init__() 

72 self.sklearnInputTransformer = None 

73 self.sklearnOutputTransformer = None 

74 self.modelConstructor = model_constructor 

75 self.modelArgs = model_args 

76 self.fitArgs = {} 

77 

78 def _tostring_excludes(self) -> List[str]: 

79 return super()._tostring_excludes() + ["sklearnInputTransformer", "sklearnOutputTransformer", "modelConstructor", "modelArgs"] 

80 

81 def with_sklearn_input_transformer(self, sklearn_input_transformer) -> __qualname__: 

82 """ 

83 :param sklearn_input_transformer: an optional sklearn preprocessor for normalising/scaling inputs 

84 :return: self 

85 """ 

86 self.sklearnInputTransformer = sklearn_input_transformer 

87 return self 

88 

89 def with_sklearn_output_transformer(self, sklearn_output_transformer): 

90 """ 

91 :param sklearn_output_transformer: an optional sklearn preprocessor for normalising/scaling outputs 

92 :return: self 

93 """ 

94 self.sklearnOutputTransformer = sklearn_output_transformer 

95 return self 

96 

97 def _transform_input(self, inputs: pd.DataFrame, fit=False) -> pd.DataFrame: 

98 return _apply_sklearn_input_transformer(inputs, self.sklearnInputTransformer, fit) 

99 

100 def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

101 """ 

102 Designed to be overridden in order to make input data-specific changes to modelArgs 

103 

104 :param inputs: the training input data 

105 :param outputs: the training output data 

106 """ 

107 pass 

108 

109 def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

110 """ 

111 Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the 

112 underlying model's `fit` method) 

113 

114 :param inputs: the training input data 

115 :param outputs: the training output data 

116 """ 

117 pass 

118 

119 def _compute_actual_fit_params(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> ActualFitParams: 

120 """ 

121 Computes additional arguments to be passed to the model's `fit` method, which are transient and shall not be saved 

122 along with the model as metadata, e.g. larger data structures such as validation data or sample weights. 

123 

124 :param inputs: the training input data 

125 :param outputs: the training output data 

126 :return: a dictionary of parameters to be passed to `fit`. 

127 """ 

128 fit_params = ActualFitParams(inputs, outputs, dict(self.fitArgs)) 

129 if weights is not None: 

130 self._warn_sample_weights_unsupported(self.is_sample_weight_supported(), weights) 

131 if self.is_sample_weight_supported(): 

132 fit_params.kwargs["sample_weight"] = weights 

133 return fit_params 

134 

135 @abstractmethod 

136 def is_sample_weight_supported(self) -> bool: 

137 pass 

138 

139 def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None): 

140 inputs = self._transform_input(inputs, fit=True) 

141 self._update_model_args(inputs, outputs) 

142 self._update_fit_args(inputs, outputs) 

143 actual_fit_params = self._compute_actual_fit_params(inputs, outputs, weights=weights) 

144 self._fit_sklearn(actual_fit_params) 

145 

146 @abstractmethod 

147 def _fit_sklearn(self, params: ActualFitParams): 

148 pass 

149 

150 def _predict(self, x: pd.DataFrame): 

151 inputs = self._transform_input(x) 

152 return self._predict_sklearn(inputs) 

153 

154 @abstractmethod 

155 def _predict_sklearn(self, inputs: pd.DataFrame): 

156 pass 

157 

158 

159class AbstractSkLearnMultipleOneDimVectorRegressionModel(AbstractSkLearnVectorRegressionModel, ABC): 

160 """ 

161 Base class for models which use several sklearn models of the same type with a single 

162 output dimension to create a multi-dimensional model (for the case where there is more than one output dimension) 

163 """ 

164 def __init__(self, model_constructor, **model_args): 

165 super().__init__(model_constructor, **model_args) 

166 self.models = {} 

167 

168 def _tostring_excludes(self) -> List[str]: 

169 return super()._tostring_excludes() + ["models"] 

170 

171 def _tostring_additional_entries(self) -> Dict[str, Any]: 

172 d = super()._tostring_additional_entries() 

173 if len(self.models) > 0: 

174 d["model[0]"] = str_sklearn_model(next(iter(self.models.values()))) 

175 else: 

176 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})" 

177 return d 

178 

179 def _fit_sklearn(self, params: ActualFitParams): 

180 for predictedVarName in params.outputs.columns: 

181 log.info(f"Fitting model for output variable '{predictedVarName}'") 

182 model = create_sklearn_model(self.modelConstructor, 

183 self.modelArgs, 

184 output_transformer=copy.deepcopy(self.sklearnOutputTransformer)) 

185 model.fit(params.inputs, params.outputs[predictedVarName], **params.kwargs) 

186 self.models[predictedVarName] = model 

187 

188 def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame: 

189 results = {} 

190 for varName in self.models: 

191 results[varName] = self._predict_sklearn_single_model(self.models[varName], inputs) 

192 return pd.DataFrame(results) 

193 

194 def _predict_sklearn_single_model(self, model, inputs: pd.DataFrame) -> np.ndarray: 

195 return model.predict(inputs) 

196 

197 def get_sklearn_model(self, predicted_var_name=None): 

198 if predicted_var_name is None: 

199 if len(self.models) > 1: 

200 raise ValueError(f"Must provide predicted variable name (one of {self.models.keys()})") 

201 return next(iter(self.models.values())) 

202 return self.models[predicted_var_name] 

203 

204 

205class AbstractSkLearnMultiDimVectorRegressionModel(AbstractSkLearnVectorRegressionModel, ABC): 

206 """ 

207 Base class for models which use a single sklearn model with multiple output dimensions to create the multi-dimensional model 

208 """ 

209 def __init__(self, model_constructor, **model_args): 

210 super().__init__(model_constructor, **model_args) 

211 self.model = None 

212 

213 def _tostring_excludes(self) -> List[str]: 

214 return super()._tostring_excludes() + ["model"] 

215 

216 def _tostring_additional_entries(self) -> Dict[str, Any]: 

217 d = super()._tostring_additional_entries() 

218 if self.model is not None: 

219 d["model"] = str_sklearn_model(self.model) 

220 else: 

221 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})" 

222 return d 

223 

224 def _fit_sklearn(self, params: ActualFitParams): 

225 if len(params.outputs.columns) > 1: 

226 log.info(f"Fitting a single multi-dimensional model for all {len(params.outputs.columns)} output dimensions") 

227 self.model = create_sklearn_model(self.modelConstructor, self.modelArgs, output_transformer=self.sklearnOutputTransformer) 

228 output_values = params.outputs.values 

229 if output_values.shape[1] == 1: # for 1D output, shape must be (numSamples,) rather than (numSamples, 1) 

230 output_values = np.ravel(output_values) 

231 self.model.fit(params.inputs, output_values, **params.kwargs) 

232 

233 def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame: 

234 y = self.model.predict(inputs) 

235 return pd.DataFrame(y, columns=self.get_model_output_variable_names()) 

236 

237 

238class AbstractSkLearnVectorClassificationModel(VectorClassificationModel, ABC): 

239 def __init__(self, model_constructor, use_balanced_class_weights=False, use_label_encoding=False, **model_args): 

240 """ 

241 :param model_constructor: the sklearn model constructor 

242 :param model_args: arguments to be passed to the sklearn model constructor 

243 :param use_balanced_class_weights: whether to compute class weights from the training data and apply the corresponding weight to 

244 each data point such that the sum of weights for all classes is equal. This is achieved by applying a weight proportional 

245 to the reciprocal frequency of the class in the (training) data. We scale weights such that the smallest weight (of the 

246 largest class) is 1, ensuring that weight counts still reasonably correspond to data point counts. 

247 Note that weighted data points may not be supported for all types of models. 

248 :param use_label_encoding: whether to replace original class labels with 0-based index in sorted list of labels (a.k.a. label 

249 encoding), which is required by some sklearn-compatible implementations (particularly xgboost) 

250 """ 

251 super().__init__() 

252 self.modelConstructor = model_constructor 

253 self.sklearnInputTransformer = None 

254 self.modelArgs = model_args 

255 self.fitArgs = {} 

256 self.useBalancedClassWeights = use_balanced_class_weights 

257 self.useLabelEncoding = use_label_encoding 

258 self.model = None 

259 

260 def __setstate__(self, state): 

261 setstate(AbstractSkLearnVectorClassificationModel, self, state, new_optional_properties=["labelEncoder"], 

262 new_default_properties={"useComputedClassWeights": False, "useLabelEncoder": False}, 

263 renamed_properties={"useComputedClassWeights": "useBalancedClassWeights"}, 

264 removed_properties=["sklearnOutputTransformer"]) 

265 

266 def _tostring_excludes(self) -> List[str]: 

267 return super()._tostring_excludes() + ["modelConstructor", "sklearnInputTransformer", "sklearnOutputTransformer", 

268 "modelArgs", "model"] 

269 

270 def _tostring_additional_entries(self) -> Dict[str, Any]: 

271 d = super()._tostring_additional_entries() 

272 if self.model is None: 

273 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})" 

274 else: 

275 d["model"] = str_sklearn_model(self.model) 

276 return d 

277 

278 def with_sklearn_input_transformer(self, sklearn_input_transformer) -> __qualname__: 

279 """ 

280 :param sklearn_input_transformer: an optional sklearn preprocessor for transforming inputs 

281 :return: self 

282 """ 

283 self.sklearnInputTransformer = sklearn_input_transformer 

284 return self 

285 

286 def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

287 """ 

288 Designed to be overridden in order to make input data-specific changes to modelArgs 

289 

290 :param inputs: the training input data 

291 :param outputs: the training output data 

292 """ 

293 pass 

294 

295 def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame): 

296 """ 

297 Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the 

298 underlying model's fit method) 

299 

300 :param inputs: the training input data 

301 :param outputs: the training output data 

302 """ 

303 pass 

304 

305 @abstractmethod 

306 def is_sample_weight_supported(self) -> bool: 

307 pass 

308 

309 def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None): 

310 inputs = self._transform_input(inputs, fit=True) 

311 self._update_model_args(inputs, outputs) 

312 self._update_fit_args(inputs, outputs) 

313 self.model = create_sklearn_model(self.modelConstructor, self.modelArgs) 

314 log.info(f"Fitting sklearn classifier of type {self.model.__class__.__name__}") 

315 kwargs = dict(self.fitArgs) 

316 

317 if self.useBalancedClassWeights and weights is not None: 

318 raise ValueError("Balanced class weights cannot be used in conjunction with user-specified weights") 

319 

320 if self.useBalancedClassWeights: 

321 class2weight = self._compute_class_weights(outputs) 

322 classes = outputs.iloc[:, 0] 

323 weights = np.array([class2weight[cls] for cls in classes]) 

324 weights = weights / np.min(weights) 

325 kwargs["sample_weight"] = weights 

326 

327 elif weights is not None: 

328 self._warn_sample_weights_unsupported(self.is_sample_weight_supported(), weights) 

329 if self.is_sample_weight_supported(): 

330 kwargs["sample_weight"] = weights 

331 

332 output_values = np.ravel(outputs.values) 

333 if self.useLabelEncoding: 

334 output_values = self._encode_labels(output_values) 

335 self._fit_sklearn_classifier(inputs, output_values, kwargs) 

336 

337 def _fit_sklearn_classifier(self, inputs: pd.DataFrame, output_values: np.ndarray, kwargs: Dict[str, Any]): 

338 self.model.fit(inputs, output_values, **kwargs) 

339 

340 def _transform_input(self, inputs: pd.DataFrame, fit=False) -> pd.DataFrame: 

341 return _apply_sklearn_input_transformer(inputs, self.sklearnInputTransformer, fit) 

342 

343 def _encode_labels(self, y: np.ndarray): 

344 d = {l: i for i, l in enumerate(self._labels)} 

345 vfn = np.vectorize(lambda x: d[x]) 

346 return vfn(y) 

347 

348 def _decode_labels(self, y: np.ndarray): 

349 d = dict(enumerate(self._labels)) 

350 vfn = np.vectorize(lambda x: d[x]) 

351 return vfn(y) 

352 

353 def _predict_sklearn(self, input_values): 

354 return self.model.predict(input_values) 

355 

356 def _predict(self, x: pd.DataFrame): 

357 input_values = self._transform_input(x) 

358 y = self._predict_sklearn(input_values) 

359 if self.useLabelEncoding: 

360 y = self._decode_labels(y) 

361 return pd.DataFrame(y, columns=self._predictedVariableNames) 

362 

363 def _predict_class_probabilities(self, x: pd.DataFrame): 

364 input_values = self._transform_input(x) 

365 y = self.model.predict_proba(input_values) 

366 return pd.DataFrame(y, columns=self._labels) 

367 

368 def get_params(self, deep=True): 

369 return self.model.get_params(deep=deep) 

370 

371 def set_params(self, **params): 

372 self.model.set_params(**params) 

373 

374 # noinspection PyMethodMayBeStatic 

375 def _compute_class_weights(self, outputs: pd.DataFrame): 

376 """ 

377 :param outputs: the output data frame containing the class labels as the first column 

378 :return: the dictionary of class weights mapping class to weight value 

379 """ 

380 classes: pd.Series = outputs.iloc[:, 0] 

381 counts = classes.value_counts() 

382 rfreqs = counts / counts.sum() 

383 weights: pd.Series = 1.0 / rfreqs 

384 return weights.to_dict() 

385 

386 

387def _get_model_feature_importance_vector(model): 

388 cand_attributes = ("feature_importances_", "coef_") 

389 for attr in cand_attributes: 

390 if hasattr(model, attr): 

391 importance_values = getattr(model, attr) 

392 if attr == "coef_": 

393 importance_values = np.abs(importance_values) # for coefficients in linear models, use the absolute values 

394 return importance_values 

395 raise ValueError(f"Model {model} has none of the attributes {cand_attributes}") 

396 

397 

398class FeatureImportanceProviderSkLearnRegressionMultipleOneDim(FeatureImportanceProvider): 

399 def get_feature_importance_dict(self) -> Dict[str, Dict[str, int]]: 

400 self: AbstractSkLearnMultipleOneDimVectorRegressionModel 

401 return {targetFeature: dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(model))) 

402 for targetFeature, model in self.models.items()} 

403 

404 

405class FeatureImportanceProviderSkLearnRegressionMultiDim(FeatureImportanceProvider): 

406 def get_feature_importance_dict(self) -> Dict[str, float]: 

407 self: AbstractSkLearnMultiDimVectorRegressionModel 

408 return dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(self.model))) 

409 

410 

411class FeatureImportanceProviderSkLearnClassification(FeatureImportanceProvider): 

412 def get_feature_importance_dict(self) -> Dict[str, float]: 

413 self: AbstractSkLearnVectorClassificationModel 

414 return dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(self.model)))