Coverage for src/sensai/sklearn/sklearn_base.py: 71%
244 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
1import copy
2import logging
3import re
4from abc import ABC, abstractmethod
5from dataclasses import dataclass
6from typing import List, Any, Dict, Optional
8import numpy as np
9import pandas as pd
10from sklearn import compose
12from ..feature_importance import FeatureImportanceProvider
13from ..util.pickle import setstate
14from ..util.string import dict_string
15from ..vector_model import VectorRegressionModel, VectorClassificationModel
17log = logging.getLogger(__name__)
20def create_sklearn_model(model_constructor, model_args, output_transformer=None):
21 model = model_constructor(**model_args)
22 if output_transformer is not None:
23 model = compose.TransformedTargetRegressor(regressor=model, transformer=output_transformer)
24 return model
27def str_sklearn_model(model):
28 """
29 Creates a cleaned string representation of the model with line breaks and indentations removed
31 :param model: the sklearn model for which to generate the cleaned string representation
32 :return: the string representation
33 """
34 return re.sub(r",\s*", ", ", str(model))
37def _apply_sklearn_input_transformer(inputs: pd.DataFrame, sklearn_input_transformer: Optional, fit: bool) -> pd.DataFrame:
38 if sklearn_input_transformer is None:
39 return inputs
40 else:
41 input_values = inputs.values
42 shape_before = input_values.shape
43 if fit:
44 input_values = sklearn_input_transformer.fit_transform(input_values)
45 else:
46 input_values = sklearn_input_transformer.transform(input_values)
47 if input_values.shape != shape_before:
48 raise Exception("sklearnInputTransformer changed the shape of the input, which is unsupported. "
49 "Consider using an a DFTSkLearnTransformer as a feature transformer instead.")
50 return pd.DataFrame(input_values, index=inputs.index, columns=inputs.columns)
53class ActualFitParams:
54 def __init__(self, inputs, outputs, kwargs: Dict[str, Any]):
55 self.inputs = inputs
56 self.outputs = outputs
57 self.kwargs = kwargs
60class AbstractSkLearnVectorRegressionModel(VectorRegressionModel, ABC):
61 """
62 Base class for models built upon scikit-learn's model implementations
63 """
64 log = log.getChild(__qualname__)
66 def __init__(self, model_constructor, **model_args):
67 """
68 :param model_constructor: the sklearn model constructor
69 :param model_args: arguments to be passed to the sklearn model constructor
70 """
71 super().__init__()
72 self.sklearnInputTransformer = None
73 self.sklearnOutputTransformer = None
74 self.modelConstructor = model_constructor
75 self.modelArgs = model_args
76 self.fitArgs = {}
78 def _tostring_excludes(self) -> List[str]:
79 return super()._tostring_excludes() + ["sklearnInputTransformer", "sklearnOutputTransformer", "modelConstructor", "modelArgs"]
81 def with_sklearn_input_transformer(self, sklearn_input_transformer) -> __qualname__:
82 """
83 :param sklearn_input_transformer: an optional sklearn preprocessor for normalising/scaling inputs
84 :return: self
85 """
86 self.sklearnInputTransformer = sklearn_input_transformer
87 return self
89 def with_sklearn_output_transformer(self, sklearn_output_transformer):
90 """
91 :param sklearn_output_transformer: an optional sklearn preprocessor for normalising/scaling outputs
92 :return: self
93 """
94 self.sklearnOutputTransformer = sklearn_output_transformer
95 return self
97 def _transform_input(self, inputs: pd.DataFrame, fit=False) -> pd.DataFrame:
98 return _apply_sklearn_input_transformer(inputs, self.sklearnInputTransformer, fit)
100 def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
101 """
102 Designed to be overridden in order to make input data-specific changes to modelArgs
104 :param inputs: the training input data
105 :param outputs: the training output data
106 """
107 pass
109 def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
110 """
111 Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the
112 underlying model's `fit` method)
114 :param inputs: the training input data
115 :param outputs: the training output data
116 """
117 pass
119 def _compute_actual_fit_params(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> ActualFitParams:
120 """
121 Computes additional arguments to be passed to the model's `fit` method, which are transient and shall not be saved
122 along with the model as metadata, e.g. larger data structures such as validation data or sample weights.
124 :param inputs: the training input data
125 :param outputs: the training output data
126 :return: a dictionary of parameters to be passed to `fit`.
127 """
128 fit_params = ActualFitParams(inputs, outputs, dict(self.fitArgs))
129 if weights is not None:
130 self._warn_sample_weights_unsupported(self.is_sample_weight_supported(), weights)
131 if self.is_sample_weight_supported():
132 fit_params.kwargs["sample_weight"] = weights
133 return fit_params
135 @abstractmethod
136 def is_sample_weight_supported(self) -> bool:
137 pass
139 def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None):
140 inputs = self._transform_input(inputs, fit=True)
141 self._update_model_args(inputs, outputs)
142 self._update_fit_args(inputs, outputs)
143 actual_fit_params = self._compute_actual_fit_params(inputs, outputs, weights=weights)
144 self._fit_sklearn(actual_fit_params)
146 @abstractmethod
147 def _fit_sklearn(self, params: ActualFitParams):
148 pass
150 def _predict(self, x: pd.DataFrame):
151 inputs = self._transform_input(x)
152 return self._predict_sklearn(inputs)
154 @abstractmethod
155 def _predict_sklearn(self, inputs: pd.DataFrame):
156 pass
159class AbstractSkLearnMultipleOneDimVectorRegressionModel(AbstractSkLearnVectorRegressionModel, ABC):
160 """
161 Base class for models which use several sklearn models of the same type with a single
162 output dimension to create a multi-dimensional model (for the case where there is more than one output dimension)
163 """
164 def __init__(self, model_constructor, **model_args):
165 super().__init__(model_constructor, **model_args)
166 self.models = {}
168 def _tostring_excludes(self) -> List[str]:
169 return super()._tostring_excludes() + ["models"]
171 def _tostring_additional_entries(self) -> Dict[str, Any]:
172 d = super()._tostring_additional_entries()
173 if len(self.models) > 0:
174 d["model[0]"] = str_sklearn_model(next(iter(self.models.values())))
175 else:
176 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})"
177 return d
179 def _fit_sklearn(self, params: ActualFitParams):
180 for predictedVarName in params.outputs.columns:
181 log.info(f"Fitting model for output variable '{predictedVarName}'")
182 model = create_sklearn_model(self.modelConstructor,
183 self.modelArgs,
184 output_transformer=copy.deepcopy(self.sklearnOutputTransformer))
185 model.fit(params.inputs, params.outputs[predictedVarName], **params.kwargs)
186 self.models[predictedVarName] = model
188 def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame:
189 results = {}
190 for varName in self.models:
191 results[varName] = self._predict_sklearn_single_model(self.models[varName], inputs)
192 return pd.DataFrame(results)
194 def _predict_sklearn_single_model(self, model, inputs: pd.DataFrame) -> np.ndarray:
195 return model.predict(inputs)
197 def get_sklearn_model(self, predicted_var_name=None):
198 if predicted_var_name is None:
199 if len(self.models) > 1:
200 raise ValueError(f"Must provide predicted variable name (one of {self.models.keys()})")
201 return next(iter(self.models.values()))
202 return self.models[predicted_var_name]
205class AbstractSkLearnMultiDimVectorRegressionModel(AbstractSkLearnVectorRegressionModel, ABC):
206 """
207 Base class for models which use a single sklearn model with multiple output dimensions to create the multi-dimensional model
208 """
209 def __init__(self, model_constructor, **model_args):
210 super().__init__(model_constructor, **model_args)
211 self.model = None
213 def _tostring_excludes(self) -> List[str]:
214 return super()._tostring_excludes() + ["model"]
216 def _tostring_additional_entries(self) -> Dict[str, Any]:
217 d = super()._tostring_additional_entries()
218 if self.model is not None:
219 d["model"] = str_sklearn_model(self.model)
220 else:
221 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})"
222 return d
224 def _fit_sklearn(self, params: ActualFitParams):
225 if len(params.outputs.columns) > 1:
226 log.info(f"Fitting a single multi-dimensional model for all {len(params.outputs.columns)} output dimensions")
227 self.model = create_sklearn_model(self.modelConstructor, self.modelArgs, output_transformer=self.sklearnOutputTransformer)
228 output_values = params.outputs.values
229 if output_values.shape[1] == 1: # for 1D output, shape must be (numSamples,) rather than (numSamples, 1)
230 output_values = np.ravel(output_values)
231 self.model.fit(params.inputs, output_values, **params.kwargs)
233 def _predict_sklearn(self, inputs: pd.DataFrame) -> pd.DataFrame:
234 y = self.model.predict(inputs)
235 return pd.DataFrame(y, columns=self.get_model_output_variable_names())
238class AbstractSkLearnVectorClassificationModel(VectorClassificationModel, ABC):
239 def __init__(self, model_constructor, use_balanced_class_weights=False, use_label_encoding=False, **model_args):
240 """
241 :param model_constructor: the sklearn model constructor
242 :param model_args: arguments to be passed to the sklearn model constructor
243 :param use_balanced_class_weights: whether to compute class weights from the training data and apply the corresponding weight to
244 each data point such that the sum of weights for all classes is equal. This is achieved by applying a weight proportional
245 to the reciprocal frequency of the class in the (training) data. We scale weights such that the smallest weight (of the
246 largest class) is 1, ensuring that weight counts still reasonably correspond to data point counts.
247 Note that weighted data points may not be supported for all types of models.
248 :param use_label_encoding: whether to replace original class labels with 0-based index in sorted list of labels (a.k.a. label
249 encoding), which is required by some sklearn-compatible implementations (particularly xgboost)
250 """
251 super().__init__()
252 self.modelConstructor = model_constructor
253 self.sklearnInputTransformer = None
254 self.modelArgs = model_args
255 self.fitArgs = {}
256 self.useBalancedClassWeights = use_balanced_class_weights
257 self.useLabelEncoding = use_label_encoding
258 self.model = None
260 def __setstate__(self, state):
261 setstate(AbstractSkLearnVectorClassificationModel, self, state, new_optional_properties=["labelEncoder"],
262 new_default_properties={"useComputedClassWeights": False, "useLabelEncoder": False},
263 renamed_properties={"useComputedClassWeights": "useBalancedClassWeights"},
264 removed_properties=["sklearnOutputTransformer"])
266 def _tostring_excludes(self) -> List[str]:
267 return super()._tostring_excludes() + ["modelConstructor", "sklearnInputTransformer", "sklearnOutputTransformer",
268 "modelArgs", "model"]
270 def _tostring_additional_entries(self) -> Dict[str, Any]:
271 d = super()._tostring_additional_entries()
272 if self.model is None:
273 d["modelConstructor"] = f"{self.modelConstructor.__name__}({dict_string(self.modelArgs)})"
274 else:
275 d["model"] = str_sklearn_model(self.model)
276 return d
278 def with_sklearn_input_transformer(self, sklearn_input_transformer) -> __qualname__:
279 """
280 :param sklearn_input_transformer: an optional sklearn preprocessor for transforming inputs
281 :return: self
282 """
283 self.sklearnInputTransformer = sklearn_input_transformer
284 return self
286 def _update_model_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
287 """
288 Designed to be overridden in order to make input data-specific changes to modelArgs
290 :param inputs: the training input data
291 :param outputs: the training output data
292 """
293 pass
295 def _update_fit_args(self, inputs: pd.DataFrame, outputs: pd.DataFrame):
296 """
297 Designed to be overridden in order to make input data-specific changes to fitArgs (arguments to be passed to the
298 underlying model's fit method)
300 :param inputs: the training input data
301 :param outputs: the training output data
302 """
303 pass
305 @abstractmethod
306 def is_sample_weight_supported(self) -> bool:
307 pass
309 def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None):
310 inputs = self._transform_input(inputs, fit=True)
311 self._update_model_args(inputs, outputs)
312 self._update_fit_args(inputs, outputs)
313 self.model = create_sklearn_model(self.modelConstructor, self.modelArgs)
314 log.info(f"Fitting sklearn classifier of type {self.model.__class__.__name__}")
315 kwargs = dict(self.fitArgs)
317 if self.useBalancedClassWeights and weights is not None:
318 raise ValueError("Balanced class weights cannot be used in conjunction with user-specified weights")
320 if self.useBalancedClassWeights:
321 class2weight = self._compute_class_weights(outputs)
322 classes = outputs.iloc[:, 0]
323 weights = np.array([class2weight[cls] for cls in classes])
324 weights = weights / np.min(weights)
325 kwargs["sample_weight"] = weights
327 elif weights is not None:
328 self._warn_sample_weights_unsupported(self.is_sample_weight_supported(), weights)
329 if self.is_sample_weight_supported():
330 kwargs["sample_weight"] = weights
332 output_values = np.ravel(outputs.values)
333 if self.useLabelEncoding:
334 output_values = self._encode_labels(output_values)
335 self._fit_sklearn_classifier(inputs, output_values, kwargs)
337 def _fit_sklearn_classifier(self, inputs: pd.DataFrame, output_values: np.ndarray, kwargs: Dict[str, Any]):
338 self.model.fit(inputs, output_values, **kwargs)
340 def _transform_input(self, inputs: pd.DataFrame, fit=False) -> pd.DataFrame:
341 return _apply_sklearn_input_transformer(inputs, self.sklearnInputTransformer, fit)
343 def _encode_labels(self, y: np.ndarray):
344 d = {l: i for i, l in enumerate(self._labels)}
345 vfn = np.vectorize(lambda x: d[x])
346 return vfn(y)
348 def _decode_labels(self, y: np.ndarray):
349 d = dict(enumerate(self._labels))
350 vfn = np.vectorize(lambda x: d[x])
351 return vfn(y)
353 def _predict_sklearn(self, input_values):
354 return self.model.predict(input_values)
356 def _predict(self, x: pd.DataFrame):
357 input_values = self._transform_input(x)
358 y = self._predict_sklearn(input_values)
359 if self.useLabelEncoding:
360 y = self._decode_labels(y)
361 return pd.DataFrame(y, columns=self._predictedVariableNames)
363 def _predict_class_probabilities(self, x: pd.DataFrame):
364 input_values = self._transform_input(x)
365 y = self.model.predict_proba(input_values)
366 return pd.DataFrame(y, columns=self._labels)
368 def get_params(self, deep=True):
369 return self.model.get_params(deep=deep)
371 def set_params(self, **params):
372 self.model.set_params(**params)
374 # noinspection PyMethodMayBeStatic
375 def _compute_class_weights(self, outputs: pd.DataFrame):
376 """
377 :param outputs: the output data frame containing the class labels as the first column
378 :return: the dictionary of class weights mapping class to weight value
379 """
380 classes: pd.Series = outputs.iloc[:, 0]
381 counts = classes.value_counts()
382 rfreqs = counts / counts.sum()
383 weights: pd.Series = 1.0 / rfreqs
384 return weights.to_dict()
387def _get_model_feature_importance_vector(model):
388 cand_attributes = ("feature_importances_", "coef_")
389 for attr in cand_attributes:
390 if hasattr(model, attr):
391 importance_values = getattr(model, attr)
392 if attr == "coef_":
393 importance_values = np.abs(importance_values) # for coefficients in linear models, use the absolute values
394 return importance_values
395 raise ValueError(f"Model {model} has none of the attributes {cand_attributes}")
398class FeatureImportanceProviderSkLearnRegressionMultipleOneDim(FeatureImportanceProvider):
399 def get_feature_importance_dict(self) -> Dict[str, Dict[str, int]]:
400 self: AbstractSkLearnMultipleOneDimVectorRegressionModel
401 return {targetFeature: dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(model)))
402 for targetFeature, model in self.models.items()}
405class FeatureImportanceProviderSkLearnRegressionMultiDim(FeatureImportanceProvider):
406 def get_feature_importance_dict(self) -> Dict[str, float]:
407 self: AbstractSkLearnMultiDimVectorRegressionModel
408 return dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(self.model)))
411class FeatureImportanceProviderSkLearnClassification(FeatureImportanceProvider):
412 def get_feature_importance_dict(self) -> Dict[str, float]:
413 self: AbstractSkLearnVectorClassificationModel
414 return dict(zip(self._modelInputVariableNames, _get_model_feature_importance_vector(self.model)))