Coverage for src/sensai/sklearn/sklearn_base.py: 72%
218 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
1import copy
2import logging
3import re
4from abc import ABC, abstractmethod
5from typing import List, Any, Dict, Optional
7import numpy as np
8import pandas as pd
9from sklearn import compose
11from ..feature_importance import FeatureImportanceProvider
12from ..util.pickle import setstate
13from ..util.string import dict_string
14from ..vector_model import VectorRegressionModel, VectorClassificationModel
16log = logging.getLogger(__name__)
19def create_sklearn_model(model_constructor, model_args, output_transformer=None):
20 model = model_constructor(**model_args)
21 if output_transformer is not None:
22 model = compose.TransformedTargetRegressor(regressor=model, transformer=output_transformer)
23 return model
26def str_sklearn_model(model):
27 """
28 Creates a cleaned string representation of the model with line breaks and indentations removed
30 :param model: the sklearn model for which to generate the cleaned string representation
31 :return: the string representation
32 """
33 return re.sub(r",\s*", ", ", str(model))
36def _apply_sklearn_input_transformer(inputs: pd.DataFrame, sklearn_input_transformer: Optional, fit: bool) -> pd.DataFrame:
37 if sklearn_input_transformer is None:
38 return inputs
39 else:
40 input_values = inputs.values
41 shape_before = input_values.shape
42 if fit:
43 input_values = sklearn_input_transformer.fit_transform(input_values)
44 else:
45 input_values = sklearn_input_transformer.transform(input_values)
46 if input_values.shape != shape_before:
47 raise Exception("sklearnInputTransformer changed the shape of the input, which is unsupported. "
48 "Consider using an a DFTSkLearnTransformer as a feature transformer instead.")
49 return pd.DataFrame(input_values, index=inputs.index, columns=inputs.columns)
52class AbstractSkLearnVectorRegressionModel(VectorRegressionModel, ABC):
53 """
54 Base class for models built upon scikit-learn's model implementations
55 """
56 log = log.getChild(__qualname__)
58 def __init__(self, model_constructor, **model_args):
59 """
60 :param model_constructor: the sklearn model constructor
61 :param model_args: arguments to be passed to the sklearn model constructor
62 """
63 super().__init__()
64 self.sklearnInputTransformer = None
65 self.sklearnOutputTransformer = None
66 self.modelConstructor = model_constructor
67 self.modelArgs = model_args
68 self.fitArgs = {}
70 def _tostring_excludes(self) -> List[str]:
71 return super()._tostring_excludes() + ["sklearnInputTransformer", "sklearnOutputTransformer", "modelConstructor", "modelArgs"]
73 def with_sklearn_input_transformer(self, sklearn_input_transformer) -> __qualname__:
74 """
75 :param sklearn_input_transformer: an optional sklearn preprocessor for normalising/scaling inputs
76 :return: self
77 """
78 self.sklearnInputTransformer = sklearn_input_transformer
79 return self
81 def with_sklearn_output_transformer(self, sklearn_output_transformer):
82 """
83 :param sklearn_output_transformer: an optional sklearn preprocessor for normalising/scaling outputs
84 :return: self
85 """
86 self.sklearnOutputTransformer = sklearn_output_transformer
87 return self
89 def _transform_input(self, inputs: pd.DataFrame, fit=False) -> pd.DataFrame:
90 return _apply_sklearn_input_transformer(inputs, self.sklearnInputTransformer, fit)
92 def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
93 """
94 Designed to be overridden in order to make input data-specific changes to modelArgs
96 :param inputs: the training input data
97 :param outputs: the training output data
98 """
99 pass
101 def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
102 """
103 Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the
104 underlying model's fit method)
106 :param inputs: the training input data
107 :param outputs: the training output data
108 """
109 pass
111 def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
112 inputs = self._transform_input(inputs, fit=True)
113 self._update_model_args(inputs, outputs)
114 self._update_fit_args(inputs, outputs)
115 self._fit_sklearn(inputs, outputs)
117 @abstractmethod
118 def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
119 pass
121 def _predict(self, x: pd.DataFrame):
122 inputs = self._transform_input(x)
123 return self._predict_sklearn(inputs)
125 @abstractmethod
126 def _predict_sklearn(self, inputs: pd.DataFrame):
127 pass
130class AbstractSkLearnMultipleOneDimVectorRegressionModel(AbstractSkLearnVectorRegressionModel, ABC):
131 """
132 Base class for models which use several sklearn models of the same type with a single
133 output dimension to create a multi-dimensional model (for the case where there is more than one output dimension)
134 """
135 def __init__(self, model_constructor, **model_args):
136 super().__init__(model_constructor, **model_args)
137 self.models = {}
139 def _tostring_excludes(self) -> List[str]:
140 return super()._tostring_excludes() + ["models"]
142 def _tostring_additional_entries(self) -> Dict[str, Any]:
143 d = super()._tostring_additional_entries()
144 if len(self.models) > 0:
145 d["model[0]"] = str_sklearn_model(next(iter(self.models.values())))
146 else:
147 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})"
148 return d
150 def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
151 for predictedVarName in outputs.columns:
152 log.info(f"Fitting model for output variable '{predictedVarName}'")
153 model = create_sklearn_model(self.modelConstructor,
154 self.modelArgs,
155 output_transformer=copy.deepcopy(self.sklearnOutputTransformer))
156 model.fit(inputs, outputs[predictedVarName], **self.fitArgs)
157 self.models[predictedVarName] = model
159 def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame:
160 results = {}
161 for varName in self.models:
162 results[varName] = self._predict_sklearn_single_model(self.models[varName], inputs)
163 return pd.DataFrame(results)
165 def _predict_sklearn_single_model(self, model, inputs: pd.DataFrame) -> np.ndarray:
166 return model.predict(inputs)
168 def get_sklearn_model(self, predicted_var_name=None):
169 if predicted_var_name is None:
170 if len(self.models) > 1:
171 raise ValueError(f"Must provide predicted variable name (one of {self.models.keys()})")
172 return next(iter(self.models.values()))
173 return self.models[predicted_var_name]
176class AbstractSkLearnMultiDimVectorRegressionModel(AbstractSkLearnVectorRegressionModel, ABC):
177 """
178 Base class for models which use a single sklearn model with multiple output dimensions to create the multi-dimensional model
179 """
180 def __init__(self, model_constructor, **model_args):
181 super().__init__(model_constructor, **model_args)
182 self.model = None
184 def _tostring_excludes(self) -> List[str]:
185 return super()._tostring_excludes() + ["model"]
187 def _tostring_additional_entries(self) -> Dict[str, Any]:
188 d = super()._tostring_additional_entries()
189 if self.model is not None:
190 d["model"] = str_sklearn_model(self.model)
191 else:
192 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})"
193 return d
195 def _fit_sklearn(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
196 if len(outputs.columns) > 1:
197 log.info(f"Fitting a single multi-dimensional model for all {len(outputs.columns)} output dimensions")
198 self.model = create_sklearn_model(self.modelConstructor, self.modelArgs, output_transformer=self.sklearnOutputTransformer)
199 output_values = outputs.values
200 if output_values.shape[1] == 1: # for 1D output, shape must be (numSamples,) rather than (numSamples, 1)
201 output_values = np.ravel(output_values)
202 self.model.fit(inputs, output_values, **self.fitArgs)
204 def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame:
205 y = self.model.predict(inputs)
206 return pd.DataFrame(y, columns=self.get_model_output_variable_names())
209class AbstractSkLearnVectorClassificationModel(VectorClassificationModel, ABC):
210 def __init__(self, model_constructor, use_balanced_class_weights=False, use_label_encoding=False, **model_args):
211 """
212 :param model_constructor: the sklearn model constructor
213 :param model_args: arguments to be passed to the sklearn model constructor
214 :param use_balanced_class_weights: whether to compute class weights from the training data and apply the corresponding weight to
215 each data point such that the sum of weights for all classes is equal. This is achieved by applying a weight proportional
216 to the reciprocal frequency of the class in the (training) data. We scale weights such that the smallest weight (of the
217 largest class) is 1, ensuring that weight counts still reasonably correspond to data point counts.
218 Note that weighted data points may not be supported for all types of models.
219 :param use_label_encoding: whether to replace original class labels with 0-based index in sorted list of labels (a.k.a. label
220 encoding), which is required by some sklearn-compatible implementations (particularly xgboost)
221 """
222 super().__init__()
223 self.modelConstructor = model_constructor
224 self.sklearnInputTransformer = None
225 self.modelArgs = model_args
226 self.fitArgs = {}
227 self.useBalancedClassWeights = use_balanced_class_weights
228 self.useLabelEncoding = use_label_encoding
229 self.model = None
231 def __setstate__(self, state):
232 setstate(AbstractSkLearnVectorClassificationModel, self, state, new_optional_properties=["labelEncoder"],
233 new_default_properties={"useComputedClassWeights": False, "useLabelEncoder": False},
234 renamed_properties={"useComputedClassWeights": "useBalancedClassWeights"},
235 removed_properties=["sklearnOutputTransformer"])
237 def _tostring_excludes(self) -> List[str]:
238 return super()._tostring_excludes() + ["modelConstructor", "sklearnInputTransformer", "sklearnOutputTransformer",
239 "modelArgs", "model"]
241 def _tostring_additional_entries(self) -> Dict[str, Any]:
242 d = super()._tostring_additional_entries()
243 if self.model is None:
244 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})"
245 else:
246 d["model"] = str_sklearn_model(self.model)
247 return d
249 def with_sklearn_input_transformer(self, sklearn_input_transformer) -> __qualname__:
250 """
251 :param sklearn_input_transformer: an optional sklearn preprocessor for transforming inputs
252 :return: self
253 """
254 self.sklearnInputTransformer = sklearn_input_transformer
255 return self
257 def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
258 """
259 Designed to be overridden in order to make input data-specific changes to modelArgs
261 :param inputs: the training input data
262 :param outputs: the training output data
263 """
264 pass
266 def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
267 """
268 Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the
269 underlying model's fit method)
271 :param inputs: the training input data
272 :param outputs: the training output data
273 """
274 pass
276 def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
277 inputs = self._transform_input(inputs, fit=True)
278 self._update_model_args(inputs, outputs)
279 self._update_fit_args(inputs, outputs)
280 self.model = create_sklearn_model(self.modelConstructor, self.modelArgs)
281 log.info(f"Fitting sklearn classifier of type {self.model.__class__.__name__}")
282 kwargs = dict(self.fitArgs)
283 if self.useBalancedClassWeights:
284 class2weight = self._compute_class_weights(outputs)
285 classes = outputs.iloc[:, 0]
286 weights = np.array([class2weight[cls] for cls in classes])
287 weights = weights / np.min(weights)
288 kwargs["sample_weight"] = weights
290 output_values = np.ravel(outputs.values)
291 if self.useLabelEncoding:
292 output_values = self._encode_labels(output_values)
293 self._fit_sklearn_classifier(inputs, output_values, kwargs)
295 def _fit_sklearn_classifier(self, inputs: pd.DataFrame, output_values: np.ndarray, kwargs: Dict[str, Any]):
296 self.model.fit(inputs, output_values, **kwargs)
298 def _transform_input(self, inputs: pd.DataFrame, fit=False) -> pd.DataFrame:
299 return _apply_sklearn_input_transformer(inputs, self.sklearnInputTransformer, fit)
301 def _encode_labels(self, y: np.ndarray):
302 d = {l: i for i, l in enumerate(self._labels)}
303 vfn = np.vectorize(lambda x: d[x])
304 return vfn(y)
306 def _decode_labels(self, y: np.ndarray):
307 d = dict(enumerate(self._labels))
308 vfn = np.vectorize(lambda x: d[x])
309 return vfn(y)
311 def _predict_sklearn(self, input_values):
312 return self.model.predict(input_values)
314 def _predict(self, x: pd.DataFrame):
315 input_values = self._transform_input(x)
316 y = self._predict_sklearn(input_values)
317 if self.useLabelEncoding:
318 y = self._decode_labels(y)
319 return pd.DataFrame(y, columns=self._predictedVariableNames)
321 def _predict_class_probabilities(self, x: pd.DataFrame):
322 input_values = self._transform_input(x)
323 y = self.model.predict_proba(input_values)
324 return pd.DataFrame(y, columns=self._labels)
326 def get_params(self, deep=True):
327 return self.model.get_params(deep=deep)
329 def set_params(self, **params):
330 self.model.set_params(**params)
332 # noinspection PyMethodMayBeStatic
333 def _compute_class_weights(self, outputs: pd.DataFrame):
334 """
335 :param outputs: the output data frame containing the class labels as the first column
336 :return: the dictionary of class weights mapping class to weight value
337 """
338 classes: pd.Series = outputs.iloc[:, 0]
339 counts = classes.value_counts()
340 rfreqs = counts / counts.sum()
341 weights: pd.Series = 1.0 / rfreqs
342 return weights.to_dict()
345def _get_model_feature_importance_vector(model):
346 cand_attributes = ("feature_importances_", "coef_")
347 for attr in cand_attributes:
348 if hasattr(model, attr):
349 importance_values = getattr(model, attr)
350 if attr == "coef_":
351 importance_values = np.abs(importance_values) # for coefficients in linear models, use the absolute values
352 return importance_values
353 raise ValueError(f"Model {model} has none of the attributes {cand_attributes}")
356class FeatureImportanceProviderSkLearnRegressionMultipleOneDim(FeatureImportanceProvider):
357 def get_feature_importance_dict(self) -> Dict[str, Dict[str, int]]:
358 self: AbstractSkLearnMultipleOneDimVectorRegressionModel
359 return {targetFeature: dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(model)))
360 for targetFeature, model in self.models.items()}
363class FeatureImportanceProviderSkLearnRegressionMultiDim(FeatureImportanceProvider):
364 def get_feature_importance_dict(self) -> Dict[str, float]:
365 self: AbstractSkLearnMultiDimVectorRegressionModel
366 return dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(self.model)))
369class FeatureImportanceProviderSkLearnClassification(FeatureImportanceProvider):
370 def get_feature_importance_dict(self) -> Dict[str, float]:
371 self: AbstractSkLearnVectorClassificationModel
372 return dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(self.model)))