Coverage for src/sensai/torch/torch_base.py: 70%
454 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
1import functools
2import io
3import logging
4import typing
5from abc import ABC, abstractmethod
6from typing import Union, Tuple, Callable, Optional, List, Sequence
8import numpy as np
9import pandas as pd
10import torch
11from torch import nn
12from torch.nn import functional as F
14from .torch_data import TensorScaler, VectorDataUtil, ClassificationVectorDataUtil, TorchDataSet, \
15 TorchDataSetProvider, Tensoriser, TorchDataSetFromDataFrames, RuleBasedTensoriser, \
16 TorchDataSetProviderFromVectorDataUtil
17from .torch_enums import ClassificationOutputMode
18from .torch_opt import NNOptimiser, NNLossEvaluatorRegression, NNLossEvaluatorClassification, NNOptimiserParams, TrainingInfo
19from ..data import DataFrameSplitter
20from ..normalisation import NormalisationMode
21from ..util.dtype import to_float_array
22from ..util.pickle import setstate
23from ..util.string import ToStringMixin
24from ..vector_model import VectorRegressionModel, VectorClassificationModel, TrainingContext
26log: logging.Logger = logging.getLogger(__name__)
29class MCDropoutCapableNNModule(nn.Module, ABC):
30 """
31 Base class for NN modules that are to support MC-Dropout.
32 Support can be added by applying the _dropout function in the module's forward method.
33 Then, to apply inference that samples results, call inferMCDropout rather than just using __call__.
34 """
36 def __init__(self) -> None:
37 super().__init__()
38 self._applyMCDropout = False
39 self._pMCDropoutOverride = None
41 def __setstate__(self, d: dict) -> None:
42 if "_applyMCDropout" not in d:
43 d["_applyMCDropout"] = False
44 if "_pMCDropoutOverride" not in d:
45 d["_pMCDropoutOverride"] = None
46 super().__setstate__(d)
48 def _dropout(self, x: torch.Tensor, p_training=None, p_inference=None) -> torch.Tensor:
49 """
50 This method is to to applied within the module's forward method to apply dropouts during training and/or inference.
52 :param x: the model input tensor
53 :param p_training: the probability with which to apply dropouts during training; if None, apply no dropout
54 :param p_inference: the probability with which to apply dropouts during MC-Dropout-based inference (via inferMCDropout,
55 which may override the probability via its optional argument);
56 if None, a dropout is not to be applied
57 :return: a potentially modified version of x with some elements dropped out, depending on application context and dropout
58 probabilities
59 """
60 if self.training and p_training is not None:
61 return F.dropout(x, p_training)
62 elif not self.training and self._applyMCDropout and p_inference is not None:
63 return F.dropout(x, p_inference if self._pMCDropoutOverride is None else self._pMCDropoutOverride)
64 else:
65 return x
67 def _enable_mc_dropout(self, enabled=True, p_mc_dropout_override=None) -> None:
68 self._applyMCDropout = enabled
69 self._pMCDropoutOverride = p_mc_dropout_override
71 def infer_mc_dropout(self, x: Union[torch.Tensor, Sequence[torch.Tensor]], num_samples, p=None) -> Tuple[torch.Tensor, torch.Tensor]:
72 """
73 Applies inference using MC-Dropout, drawing the given number of samples.
75 :param x: the model input (a tensor or tuple/list of tensors)
76 :param num_samples: the number of samples to draw with MC-Dropout
77 :param p: the dropout probability to apply, overriding the probability specified by the model's forward method; if None, use model's
78 default
79 :return: a pair (y, sd) where y the mean output tensor and sd is a tensor of the same dimension containing standard deviations
80 """
81 if type(x) not in (tuple, list):
82 x = [x]
83 results = []
84 self._enable_mc_dropout(True, p_mc_dropout_override=p)
85 try:
86 for i in range(num_samples):
87 y = self(*x)
88 results.append(y)
89 finally:
90 self._enable_mc_dropout(False)
91 results = torch.stack(results)
92 mean = torch.mean(results, 0)
93 stddev = torch.std(results, 0, unbiased=False)
94 return mean, stddev
97class TorchModel(ABC, ToStringMixin):
98 """
99 sensAI abstraction for torch models, which supports one-line training, allows for convenient model application,
100 has basic mechanisms for data scaling, and soundly handles persistence (via pickle).
101 An instance wraps a torch.nn.Module, which is constructed on demand during training via the factory method
102 createTorchModule.
103 """
104 log: logging.Logger = log.getChild(__qualname__)
106 def __init__(self, cuda=True) -> None:
107 self.cuda: bool = cuda
108 self.module: Optional[torch.nn.Module] = None
109 self.outputScaler: Optional[TensorScaler] = None
110 self.inputScaler: Optional[TensorScaler] = None
111 self.trainingInfo: Optional[TrainingInfo] = None
112 self._gpu: Optional[int] = None
113 self._normalisationCheckThreshold: Optional[int] = 5
115 def _tostring_exclude_private(self) -> bool:
116 return True
118 def set_torch_module(self, module: torch.nn.Module) -> None:
119 self.module = module
121 def set_normalisation_check_threshold(self, threshold: Optional[float]):
122 self._normalisationCheckThreshold = threshold
124 def get_module_bytes(self) -> bytes:
125 bytes_io = io.BytesIO()
126 torch.save(self.module, bytes_io)
127 return bytes_io.getvalue()
129 def set_module_bytes(self, model_bytes: bytes) -> None:
130 model_file = io.BytesIO(model_bytes)
131 self._load_model(model_file)
133 def get_torch_module(self) -> torch.nn.Module:
134 return self.module
136 def _set_cuda_enabled(self, is_cuda_enabled: bool) -> None:
137 self.cuda = is_cuda_enabled
139 def _is_cuda_enabled(self) -> bool:
140 return self.cuda
142 def _load_model(self, model_file) -> None: # TODO: complete type hints: what types are allowed for modelFile?
143 try:
144 self.module = torch.load(model_file)
145 self._gpu = self._get_gpu_from_model_parameter_device()
146 except:
147 if self._is_cuda_enabled():
148 if torch.cuda.device_count() > 0:
149 new_device = "cuda:0"
150 else:
151 new_device = "cpu"
152 self.log.warning(f"Loading of CUDA model failed, trying to map model to device {new_device}...")
153 if type(model_file) != str:
154 model_file.seek(0)
155 try:
156 self.module = torch.load(model_file, map_location=new_device)
157 except:
158 self.log.warning(f"Failure to map model to device {new_device}, trying CPU...")
159 if new_device != "cpu":
160 new_device = "cpu"
161 self.module = torch.load(model_file, map_location=new_device)
162 if new_device == "cpu":
163 self._set_cuda_enabled(False)
164 self._gpu = None
165 else:
166 self._gpu = 0
167 self.log.info(f"Model successfully loaded to {new_device}")
168 else:
169 raise
171 @abstractmethod
172 def create_torch_module(self) -> torch.nn.Module:
173 pass
175 def __getstate__(self) -> dict:
176 state = dict(self.__dict__)
177 del state["module"]
178 state["modelBytes"] = self.get_module_bytes()
179 return state
181 def __setstate__(self, d: dict) -> None:
182 # backward compatibility
183 if "bestEpoch" in d:
184 d["trainingInfo"] = TrainingInfo(best_epoch=d["bestEpoch"])
185 del d["bestEpoch"]
186 new_default_properties = {"_normalisationCheckThreshold": 5}
188 model_bytes = None
189 if "modelBytes" in d:
190 model_bytes = d["modelBytes"]
191 del d["modelBytes"]
192 setstate(TorchModel, self, d, new_default_properties=new_default_properties)
193 if model_bytes is not None:
194 self.set_module_bytes(model_bytes)
196 def apply(self,
197 x: Union[torch.Tensor, np.ndarray, TorchDataSet, Sequence[torch.Tensor]],
198 as_numpy: bool = True, create_batch: bool = False,
199 mc_dropout_samples: Optional[int] = None,
200 mc_dropout_probability: Optional[float] = None,
201 scale_output: bool = False,
202 scale_input: bool = False) -> Union[torch.Tensor, np.ndarray, Tuple]:
203 """
204 Applies the model to the given input tensor and returns the result
206 :param x: the input tensor (either a batch or, if createBatch=True, a single data point), a data set or a tuple/list of tensors
207 (if the model accepts more than one input).
208 If it is a data set, it will be processed at once, so the data set must not be too large to be processed at once.
209 :param as_numpy: flag indicating whether to convert the result to a numpy.array (if False, return tensor)
210 :param create_batch: whether to add an additional tensor dimension for a batch containing just one data point
211 :param mc_dropout_samples: if not None, apply MC-Dropout-based inference with the respective number of samples; if None, apply
212 regular inference
213 :param mc_dropout_probability: the probability with which to apply dropouts in MC-Dropout-based inference; if None, use model's
214 default
215 :param scale_output: whether to scale the output that is produced by the underlying model (using this instance's output scaler,
216 if any)
217 :param scale_input: whether to scale the input (using this instance's input scaler, if any) before applying the underlying model
219 :return: an output tensor or, if MC-Dropout is applied, a pair (y, sd) where y the mean output tensor and sd is a tensor of the
220 same dimension containing standard deviations
221 """
222 def extract(z):
223 if scale_output:
224 z = self.scaled_output(z)
225 if self._is_cuda_enabled():
226 z = z.cpu()
227 z = z.detach()
228 if as_numpy:
229 z = z.numpy()
230 return z
232 model = self.get_torch_module()
233 model.eval()
235 if isinstance(x, TorchDataSet):
236 x = next(x.iter_batches(x.size(), input_only=True, shuffle=False))
237 elif isinstance(x, np.ndarray):
238 x = to_float_array(x)
239 x = torch.from_numpy(x).float()
241 if type(x) not in (list, tuple):
242 inputs = [x]
243 else:
244 inputs = x
246 if self._is_cuda_enabled():
247 torch.cuda.set_device(self._gpu)
248 inputs = [t.cuda() for t in inputs]
249 if scale_input:
250 inputs = [self.inputScaler.normalise(t) for t in inputs]
251 if create_batch:
252 inputs = [t.view(1, *x.size()) for t in inputs]
254 # check input normalisation
255 if self._normalisationCheckThreshold is not None:
256 for i, t in enumerate(inputs):
257 if t.is_floating_point() and t.numel() > 0: # skip any integer tensors (which typically contain lengths) and empty tensors
258 max_value = t.abs().max().item()
259 if max_value > self._normalisationCheckThreshold:
260 log.warning(f"Received value in input tensor {i} which is likely to not be correctly normalised: "
261 f"maximum abs. value in tensor is %f" % max_value)
262 if mc_dropout_samples is None:
263 y = model(*inputs)
264 return extract(y)
265 else:
266 y, stddev = model.inferMCDropout(x, mc_dropout_samples, p=mc_dropout_probability)
267 return extract(y), extract(stddev)
269 def apply_scaled(self, x: Union[torch.Tensor, np.ndarray, TorchDataSet, Sequence[torch.Tensor]],
270 as_numpy: bool = True,
271 create_batch: bool = False,
272 mc_dropout_samples: Optional[int] = None,
273 mc_dropout_probability: Optional[float] = None) \
274 -> Union[torch.Tensor, np.ndarray]:
275 """
276 applies the model to the given input tensor and returns the scaled result (i.e. in the original scale)
278 :param x: the input tensor(s) or data set
279 :param as_numpy: flag indicating whether to convert the result to a numpy.array (if False, return tensor)
280 :param create_batch: whether to add an additional tensor dimension for a batch containing just one data point
281 :param mc_dropout_samples: if not None, apply MC-Dropout-based inference with the respective number of samples; if None, apply
282 regular inference
283 :param mc_dropout_probability: the probability with which to apply dropouts in MC-Dropout-based inference; if None, use model's
284 default
286 :return: a scaled output tensor or, if MC-Dropout is applied, a pair (y, sd) of scaled tensors, where
287 y the mean output tensor and sd is a tensor of the same dimension containing standard deviations
288 """
289 return self.apply(x, scale_output=True, scale_input=True, as_numpy=as_numpy, create_batch=create_batch,
290 mc_dropout_samples=mc_dropout_samples, mc_dropout_probability=mc_dropout_probability)
292 def scaled_output(self, output: torch.Tensor) -> torch.Tensor:
293 return self.outputScaler.denormalise(output)
295 def _extract_params_from_data(self, data: TorchDataSetProvider) -> None:
296 self.outputScaler = data.get_output_tensor_scaler()
297 self.inputScaler = data.get_input_tensor_scaler()
299 def fit(self, data: TorchDataSetProvider, nn_optimiser_params: NNOptimiserParams, strategy: "TorchModelFittingStrategy" = None) \
300 -> None:
301 """
302 Fits this model using the given model and strategy
304 :param data: a provider for the data with which to fit the model
305 :param strategy: the fitting strategy; if None, use TorchModelFittingStrategyDefault.
306 Pass your own strategy to perform custom fitting processes, e.g. process which involve multi-stage learning
307 :param nn_optimiser_params: the parameters with which to create an optimiser which can be applied in the fitting strategy
308 """
309 self._extract_params_from_data(data)
310 optimiser = NNOptimiser(nn_optimiser_params)
311 if strategy is None:
312 strategy = TorchModelFittingStrategyDefault()
313 self.trainingInfo = strategy.fit(self, data, optimiser)
314 self._gpu = self._get_gpu_from_model_parameter_device()
316 def _get_gpu_from_model_parameter_device(self) -> Optional[int]:
317 try:
318 return next(self.module.parameters()).get_device()
319 except:
320 return None
322 @property
323 def best_epoch(self) -> Optional[int]:
324 return self.trainingInfo.best_epoch if self.trainingInfo is not None else None
326 @property
327 def total_epochs(self) -> Optional[int]:
328 return self.trainingInfo.total_epochs if self.trainingInfo is not None else None
330 def _tostring_excludes(self) -> List[str]:
331 return ['_gpu', 'module', 'trainingInfo', "inputScaler", "outputScaler"]
333 def _tostring_additional_entries(self):
334 return dict(bestEpoch=self.best_epoch, totalEpochs=self.total_epochs)
337class TorchModelFittingStrategy(ABC):
338 """
339 Defines the interface for fitting strategies that can be used in TorchModel.fit
340 """
341 @abstractmethod
342 def fit(self, model: TorchModel, data: TorchDataSetProvider, nn_optimiser: NNOptimiser) -> Optional[TrainingInfo]:
343 pass
346class TorchModelFittingStrategyDefault(TorchModelFittingStrategy):
347 """
348 Represents the default fitting strategy, which simply applies the given optimiser to the model and data
349 """
350 def fit(self, model: TorchModel, data: TorchDataSetProvider, nn_optimiser: NNOptimiser) -> Optional[TrainingInfo]:
351 return nn_optimiser.fit(model, data)
354class TorchModelFromModuleFactory(TorchModel):
355 def __init__(self, module_factory: Callable[..., torch.nn.Module], *args, cuda: bool = True, **kwargs) -> None:
356 super().__init__(cuda)
357 self.args = args
358 self.kwargs = kwargs
359 self.moduleFactory = module_factory
361 def create_torch_module(self) -> torch.nn.Module:
362 return self.moduleFactory(*self.args, **self.kwargs)
365class TorchModelFromModule(TorchModel):
366 def __init__(self, module: torch.nn.Module, cuda: bool = True):
367 super().__init__(cuda=cuda)
368 self.module = module
370 def create_torch_module(self) -> torch.nn.Module:
371 return self.module
374class TorchModelFactoryFromModule:
375 """Represents a factory for the creation of a TorchModel based on a torch module"""
376 def __init__(self, module: torch.nn.Module, cuda: bool = True):
377 self.module = module
378 self.cuda = cuda
380 def __call__(self) -> TorchModel:
381 return TorchModelFromModule(self.module, self.cuda)
384class VectorTorchModel(TorchModel, ABC):
385 """
386 Base class for TorchModels that can be used within VectorModels, where the input and output dimensions
387 are determined by the data
388 """
389 def __init__(self, cuda: bool = True) -> None:
390 super().__init__(cuda=cuda)
391 self.inputDim = None
392 self.outputDim = None
394 def _extract_params_from_data(self, data: TorchDataSetProvider) -> None:
395 super()._extract_params_from_data(data)
396 self.inputDim = data.get_input_dim()
397 self.outputDim = data.get_model_output_dim()
399 def create_torch_module(self) -> torch.nn.Module:
400 return self.create_torch_module_for_dims(self.inputDim, self.outputDim)
402 @abstractmethod
403 def create_torch_module_for_dims(self, input_dim: int, output_dim: int) -> torch.nn.Module:
404 """
405 :param input_dim: the number of input dimensions as reported by the data set provider (number of columns
406 in input data frame for default providers)
407 :param output_dim: the number of output dimensions as reported by the data set provider (for default providers,
408 this will be the number of columns in the output data frame or, for classification, the number of classes)
409 :return: the torch module
410 """
411 pass
414class TorchAutoregressiveResultHandler(ABC):
415 """
416 Supports the saving of predictions results such that subsequent predictions
417 can build on earlier predictions, thus supporting autoregressive models.
418 """
420 @abstractmethod
421 def clear_results(self):
422 pass
424 @abstractmethod
425 def save_results(self, input_df: pd.DataFrame, results: np.ndarray) -> None:
426 """
427 Saves the regression results such that they can be used as input for subsequent prediction steps.
428 The input will typically be processed by a feature generator or vectoriser, so the result
429 should be stored in a place from which the respective feature generator or vectoriser can retrieve it.
431 :param input_df: the input data frame for which results were obtained (number of rows corresponds to
432 length of `results`)
433 :param results: the results array, which is typically a 2D array where `results[i]` is an array
434 containing the results for the i-th input row
435 """
436 pass
439TTorchVectorRegressionModel = typing.TypeVar("TTorchVectorRegressionModel", bound="TorchVectorRegressionModel")
442class TorchVectorRegressionModel(VectorRegressionModel):
443 """
444 Base class for the implementation of VectorRegressionModels based on TorchModels.
445 An instance of this class will have an instance of TorchModel as the underlying model.
446 """
448 def __init__(self, torch_model_factory: Callable[[], TorchModel],
449 normalisation_mode: NormalisationMode = NormalisationMode.NONE,
450 nn_optimiser_params: Union[dict, NNOptimiserParams, None] = None) -> None:
451 """
452 :param torch_model_factory: the factory function with which to create the contained TorchModel instance that the instance is to
453 encapsulate. For the instance to be picklable, this cannot be a lambda or locally defined function.
454 :param normalisation_mode: the normalisation mode to apply to input data frames
455 :param nn_optimiser_params: the parameters to apply in NNOptimiser during training
456 """
457 super().__init__()
459 if nn_optimiser_params is None:
460 nn_optimiser_params_instance = NNOptimiserParams()
461 else:
462 nn_optimiser_params_instance = NNOptimiserParams.from_dict_or_instance(nn_optimiser_params)
463 if nn_optimiser_params_instance.loss_evaluator is None:
464 nn_optimiser_params_instance.loss_evaluator = NNLossEvaluatorRegression(NNLossEvaluatorRegression.LossFunction.MSELOSS)
466 self.torch_model_factory = torch_model_factory
467 self.normalisationMode = normalisation_mode
468 self.nnOptimiserParams = nn_optimiser_params_instance
469 self.model: Optional[TorchModel] = None
470 self.inputTensoriser: Optional[Tensoriser] = None
471 self.outputTensoriser: Optional[Tensoriser] = None
472 self.outputTensorToArrayConverter: Optional[OutputTensorToArrayConverter] = None
473 self.torchDataSetProviderFactory: Optional[TorchDataSetProviderFactory] = None
474 self.dataFrameSplitter: Optional[DataFrameSplitter] = None
475 self._normalisationCheckThreshold = 5
476 self.inferenceBatchSize: Optional[int] = None
477 self.autoregressiveResultHandler: Optional[TorchAutoregressiveResultHandler] = None
479 def __setstate__(self, state) -> None:
480 if "modelClass" in state: # old-style factory
481 state["torch_model_factory"] = functools.partial(state["modelClass"], *state["modelArgs"], **state["modelKwArgs"])
482 for k in ("modelClass", "modelArgs", "modelKwArgs"):
483 del state[k]
484 state["nnOptimiserParams"] = NNOptimiserParams.from_dict_or_instance(state["nnOptimiserParams"])
485 new_optional_members = ["inputTensoriser", "torchDataSetProviderFactory", "dataFrameSplitter", "outputTensoriser",
486 "outputTensorToArrayConverter", "inferenceBatchSize", "autoRegressiveResultHandler"]
487 new_default_properties = {"_normalisationCheckThreshold": 5}
488 setstate(TorchVectorRegressionModel, self, state, new_optional_properties=new_optional_members,
489 new_default_properties=new_default_properties)
491 @classmethod
492 def from_module(cls, module: torch.nn.Module, cuda=True, normalisation_mode: NormalisationMode = NormalisationMode.NONE,
493 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> "TorchVectorRegressionModel":
494 return cls(TorchModelFactoryFromModule(module=module, cuda=cuda), normalisation_mode=normalisation_mode,
495 nn_optimiser_params=nn_optimiser_params)
497 def _tostring_excludes(self) -> List[str]:
498 excludes = super()._tostring_excludes()
499 if self.model is not None:
500 return excludes + ["modelClass", "modelArgs", "modelKwArgs"]
501 else:
502 return excludes
504 def with_input_tensoriser(self: TTorchVectorRegressionModel, tensoriser: Tensoriser) -> TTorchVectorRegressionModel:
505 """
506 :param tensoriser: tensoriser to use in order to convert input data frames to (one or more) tensors.
507 The default tensoriser directly converts the data frame's values (which is assumed to contain only scalars that
508 can be coerced to floats) to a float tensor.
509 The use of a custom tensoriser is necessary if a non-trivial conversion is necessary or if the data frame
510 is to be converted to more than one input tensor.
511 :return: self
512 """
513 self.inputTensoriser = tensoriser
514 return self
516 def with_output_tensoriser(self: TTorchVectorRegressionModel, tensoriser: RuleBasedTensoriser) -> TTorchVectorRegressionModel:
517 """
518 :param tensoriser: tensoriser to use in order to convert the output data frame to a tensor.
519 The default output tensoriser directly converts the data frame's values to a float tensor.
521 NOTE: It is required to be a rule-based tensoriser, because mechanisms that require fitting on the data
522 and thus perform a data-dependendent conversion are likely to cause problems because they would need
523 to be reversed at inference time (since the model will be trained on the converted values). If you require
524 a transformation, use a target transformer, which will be applied before the tensoriser.
525 :return: self
526 """
527 self.outputTensoriser = tensoriser
528 return self
530 def with_output_tensor_to_array_converter(self: TTorchVectorRegressionModel,
531 output_tensor_to_array_converter: "OutputTensorToArrayConverter") -> TTorchVectorRegressionModel:
532 """
533 Configures the use of a custom converter from tensors to numpy arrays, which is applied during inference.
534 A custom converter can be required, for example, to handle variable-length outputs (where the output tensor
535 will typically contain unwanted padding). Note that since the converter is for inference only, it may be
536 required to use a custom loss evaluator during training if the use of a custom converter is necessary.
538 :param output_tensor_to_array_converter: the converter
539 :return: self
540 """
541 self.outputTensorToArrayConverter = output_tensor_to_array_converter
542 return self
544 def with_torch_data_set_provider_factory(self: TTorchVectorRegressionModel,
545 torch_data_set_provider_factory: "TorchDataSetProviderFactory") -> TTorchVectorRegressionModel:
546 """
547 :param torch_data_set_provider_factory: the torch data set provider factory, which is used to instantiate the provider which
548 will provide the training and validation data sets from the input data frame that is passed in for learning.
549 By default, TorchDataSetProviderFactoryRegressionDefault is used.
550 :return: self
551 """
552 self.torchDataSetProviderFactory = torch_data_set_provider_factory
553 return self
555 def with_data_frame_splitter(self: TTorchVectorRegressionModel, data_frame_splitter: DataFrameSplitter) -> TTorchVectorRegressionModel:
556 """
557 :param data_frame_splitter: the data frame splitter which is used to split the input/output data frames that are passed for
558 learning into a data frame that is used for training and a data frame that is used for validation.
559 The input data frame is the data frame that is passed as input to the splitter, and the returned indices
560 are used to split both the input and output data frames in the same way.
561 :return: self
562 """
563 self.dataFrameSplitter = data_frame_splitter
564 return self
566 def with_normalisation_check_threshold(self: TTorchVectorRegressionModel, threshold: Optional[float]) -> TTorchVectorRegressionModel:
567 """
568 Defines a threshold with which to check inputs that are passed to the underlying neural network.
569 Whenever an (absolute) input value exceeds the threshold, a warning is triggered.
571 :param threshold: the threshold
572 :return: self
573 """
574 self._normalisationCheckThreshold = threshold
575 return self
577 def with_autoregressive_result_handler(self: TTorchVectorRegressionModel,
578 result_handler: TorchAutoregressiveResultHandler,
579 inference_batch_size=1) -> TTorchVectorRegressionModel:
580 """
581 Adds a result handler which can be used to store prediction results such that subsequent predictions
582 can use the prediction result, supporting autoregressive models.
583 The autoregressive predictions are assumed to be handled in a single call to method :meth:`predict`,
584 and the results will be stored for the duration of the call.
585 For autoregressive predictions that build on earlier predictions, we must typically restrict
586 the batch size such that predictions from the earlier batch can be saved and correctly reused
587 as input for the subsequent predictions. The models input preprocessors (such as feature generators
588 or vectorisers) must make ensure that the results being stored by the result handler are appropriately
589 used as input.
591 :param result_handler: the result handler
592 :param inference_batch_size: the batch size to use for predictions
593 :return: self
594 """
595 self.autoregressiveResultHandler = result_handler
596 self.inferenceBatchSize = inference_batch_size
597 return self
599 def _create_torch_model(self) -> TorchModel:
600 torch_model = self.torch_model_factory()
601 torch_model.set_normalisation_check_threshold(self._normalisationCheckThreshold)
602 return torch_model
604 def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> TorchDataSetProvider:
605 factory = self.torchDataSetProviderFactory
606 if factory is None:
607 factory = TorchDataSetProviderFactoryRegressionDefault()
608 return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser,
609 output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter)
611 def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> None:
612 self._warn_sample_weights_unsupported(False, weights)
613 if self.inputTensoriser is not None:
614 log.info(f"Fitting {self.inputTensoriser} ...")
615 self.inputTensoriser.fit(inputs, model=self)
616 self.model = self._create_torch_model()
617 data_set_provider = self._create_data_set_provider(inputs, outputs)
618 self.model.fit(data_set_provider, self.nnOptimiserParams)
620 def _predict_outputs_for_input_data_frame(self, inputs: pd.DataFrame) -> np.ndarray:
621 tensorise_dynamically = False
622 if self.autoregressiveResultHandler is not None:
623 self.autoregressiveResultHandler.clear_results()
624 tensorise_dynamically = True # must be dynamically tensorised to allow inclusion of predicted results
625 batch_size = self.nnOptimiserParams.batch_size if self.inferenceBatchSize is None else self.inferenceBatchSize
626 results: List[np.ndarray] = []
627 data_set = TorchDataSetFromDataFrames(inputs, None, self.model.cuda, input_tensoriser=self.inputTensoriser,
628 tensorise_dynamically=tensorise_dynamically)
629 start_idx = 0
630 for input_batch in data_set.iter_batches(batch_size, input_only=True):
631 if self.outputTensorToArrayConverter is None:
632 result = self.model.apply_scaled(input_batch, as_numpy=True)
633 else:
634 output_batch = self.model.apply_scaled(input_batch, as_numpy=False)
635 result = self.outputTensorToArrayConverter.convert(output_batch, input_batch)
636 if self.autoregressiveResultHandler is not None:
637 self.autoregressiveResultHandler.save_results(inputs.iloc[start_idx:start_idx+len(result)], result)
638 start_idx += len(result)
639 results.append(result)
640 if self.autoregressiveResultHandler is not None:
641 self.autoregressiveResultHandler.clear_results()
642 return np.concatenate(results)
644 def _predict(self, inputs: pd.DataFrame) -> pd.DataFrame:
645 y_array = self._predict_outputs_for_input_data_frame(inputs)
646 return pd.DataFrame(y_array, columns=self.get_model_output_variable_names())
649TTorchVectorClassificationModel = typing.TypeVar("TTorchVectorClassificationModel", bound="TorchVectorClassificationModel")
652class TorchVectorClassificationModel(VectorClassificationModel):
653 """
654 Base class for the implementation of VectorClassificationModels based on TorchModels.
655 An instance of this class will have an instance of TorchModel as the underlying model.
656 """
657 def __init__(self, output_mode: ClassificationOutputMode,
658 torch_model_factory: Callable[[], TorchModel],
659 normalisation_mode: NormalisationMode = NormalisationMode.NONE,
660 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> None:
661 """
662 :param output_mode: specifies the nature of the output of the underlying neural network model
663 :param torch_model_factory: the factory function with which to create the contained TorchModel instance that the instance is to
664 encapsulate. For the instance to be picklable, this cannot be a lambda or locally defined function.
665 :param normalisation_mode: the normalisation mode to apply to input data frames
666 :param nn_optimiser_params: the parameters to apply in NNOptimiser during training
667 """
668 super().__init__()
670 if nn_optimiser_params is None:
671 nn_optimiser_params = NNOptimiserParams()
672 if nn_optimiser_params.loss_evaluator is None:
673 loss_function = NNLossEvaluatorClassification.LossFunction.default_for_output_mode(output_mode)
674 nn_optimiser_params.loss_evaluator = NNLossEvaluatorClassification(loss_function)
676 self.outputMode = output_mode
677 self.torch_model_factory = torch_model_factory
678 self.normalisationMode = normalisation_mode
679 self.nnOptimiserParams: NNOptimiserParams = nn_optimiser_params
680 self.model: Optional[TorchModel] = None
681 self.inputTensoriser: Optional[Tensoriser] = None
682 self.outputTensoriser: Optional[Tensoriser] = None
683 self.torchDataSetProviderFactory: Optional[TorchDataSetProviderFactory] = None
684 self.dataFrameSplitter: Optional[DataFrameSplitter] = None
685 self._normalisationCheckThreshold = 5
687 # noinspection DuplicatedCode
688 def __setstate__(self, state) -> None:
689 if "modelClass" in state: # old-style factory
690 state["torch_model_factory"] = functools.partial(state["modelClass"], *state["modelArgs"], **state["modelKwArgs"])
691 for k in ("modelClass", "modelArgs", "modelKwArgs"):
692 del state[k]
693 state["nnOptimiserParams"] = NNOptimiserParams.from_dict_or_instance(state["nnOptimiserParams"])
694 new_optional_members = ["inputTensoriser", "torchDataSetProviderFactory", "dataFrameSplitter", "outputTensoriser"]
695 new_default_properties = {"outputMode": ClassificationOutputMode.PROBABILITIES, "_normalisationCheckThreshold": 5}
696 setstate(TorchVectorClassificationModel, self, state, new_optional_properties=new_optional_members,
697 new_default_properties=new_default_properties)
699 @classmethod
700 def from_module(cls, module: torch.nn.Module, output_mode: ClassificationOutputMode, cuda=True,
701 normalisation_mode: NormalisationMode = NormalisationMode.NONE,
702 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> "TorchVectorClassificationModel":
703 return cls(output_mode, TorchModelFactoryFromModule(module, cuda=cuda),
704 normalisation_mode=normalisation_mode, nn_optimiser_params=nn_optimiser_params)
706 def with_input_tensoriser(self: TTorchVectorClassificationModel, tensoriser: Tensoriser) -> TTorchVectorClassificationModel:
707 """
708 :param tensoriser: tensoriser to use in order to convert input data frames to (one or more) tensors.
709 The default tensoriser directly converts the data frame's values (which is assumed to contain only scalars that
710 can be coerced to floats) to a float tensor.
711 The use of a custom tensoriser is necessary if a non-trivial conversion is necessary or if the data frame
712 is to be converted to more than one input tensor.
713 :return: self
714 """
715 self.inputTensoriser = tensoriser
716 return self
718 def with_output_tensoriser(self: TTorchVectorClassificationModel, tensoriser: RuleBasedTensoriser) -> TTorchVectorClassificationModel:
719 """
720 :param tensoriser: tensoriser to use in order to convert the output data frame to a tensor.
721 NOTE: It is required to be a rule-based tensoriser, because mechanisms that require fitting on the data
722 and thus perform a data-dependendent conversion are likely to cause problems because they would need
723 to be reversed at inference time (since the model will be trained on the converted values). If you require
724 a transformation, use a target transformer, which will be applied before the tensoriser.
725 """
726 self.outputTensoriser = tensoriser
727 return self
729 def with_torch_data_set_provider_factory(self: TTorchVectorClassificationModel,
730 torch_data_set_provider_factory: "TorchDataSetProviderFactory") -> TTorchVectorClassificationModel:
731 """
732 :param torch_data_set_provider_factory: the torch data set provider factory, which is used to instantiate the provider which
733 will provide the training and validation data sets from the input data frame that is passed in for learning.
734 By default, TorchDataSetProviderFactoryClassificationDefault is used.
735 :return: self
736 """
737 self.torchDataSetProviderFactory = torch_data_set_provider_factory
738 return self
740 def with_data_frame_splitter(self: TTorchVectorClassificationModel, data_frame_splitter: DataFrameSplitter) \
741 -> TTorchVectorClassificationModel:
742 """
743 :param data_frame_splitter: the data frame splitter which is used to split the input/output data frames that are passed for
744 learning into a data frame that is used for training and a data frame that is used for validation.
745 The input data frame is the data frame that is passed as input to the splitter, and the returned indices
746 are used to split both the input and output data frames in the same way.
747 :return: self
748 """
749 self.dataFrameSplitter = data_frame_splitter
750 return self
752 def with_normalisation_check_threshold(self: TTorchVectorClassificationModel, threshold: Optional[float]) \
753 -> TTorchVectorClassificationModel:
754 """
755 Defines a threshold with which to check inputs that are passed to the underlying neural network.
756 Whenever an (absolute) input value exceeds the threshold, a warning is triggered.
758 :param threshold: the threshold
759 :return: self
760 """
761 self._normalisationCheckThreshold = threshold
762 return self
764 def _create_torch_model(self) -> TorchModel:
765 torch_model = self.torch_model_factory()
766 torch_model.set_normalisation_check_threshold(self._normalisationCheckThreshold)
767 return torch_model
769 def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> TorchDataSetProvider:
770 factory = self.torchDataSetProviderFactory
771 if factory is None:
772 factory = TorchDataSetProviderFactoryClassificationDefault()
773 return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser,
774 output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter)
776 def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> None:
777 self._warn_sample_weights_unsupported(False, weights)
778 if len(outputs.columns) != 1:
779 raise ValueError("Expected one output dimension: the class labels")
781 if self.inputTensoriser is not None:
782 log.info(f"Fitting {self.inputTensoriser} ...")
783 self.inputTensoriser.fit(inputs, model=self)
785 # transform outputs: for each data point, the new output shall be the index in the list of labels
786 labels: pd.Series = outputs.iloc[:, 0]
787 outputs = pd.DataFrame([self._labels.index(l) for l in labels], columns=outputs.columns, index=outputs.index)
789 self.model = self._create_torch_model()
791 data_set_provider = self._create_data_set_provider(inputs, outputs)
792 self.model.fit(data_set_provider, self.nnOptimiserParams)
794 def _predict_outputs_for_input_data_frame(self, inputs: pd.DataFrame) -> torch.Tensor:
795 batch_size = self.nnOptimiserParams.batch_size
796 results = []
797 data_set = TorchDataSetFromDataFrames(inputs, None, self.model.cuda, input_tensoriser=self.inputTensoriser)
798 for inputBatch in data_set.iter_batches(batch_size, input_only=True):
799 results.append(self.model.apply_scaled(inputBatch, as_numpy=False))
800 return torch.cat(results, dim=0)
802 def _predict_class_probabilities(self, inputs: pd.DataFrame) -> pd.DataFrame:
803 y = self._predict_outputs_for_input_data_frame(inputs)
804 if self.outputMode == ClassificationOutputMode.PROBABILITIES:
805 pass
806 elif self.outputMode == ClassificationOutputMode.LOG_PROBABILITIES:
807 y = y.exp()
808 elif self.outputMode == ClassificationOutputMode.UNNORMALISED_LOG_PROBABILITIES:
809 y = y.softmax(dim=1)
810 else:
811 raise ValueError(f"Unhandled output mode {self.outputMode}")
812 return pd.DataFrame(y.numpy(), columns=self._labels)
814 def _tostring_excludes(self) -> List[str]:
815 excludes = super()._tostring_excludes()
816 if self.model is not None:
817 return excludes + ["modelClass", "modelArgs", "modelKwArgs"]
818 else:
819 return excludes
822class TorchDataSetProviderFactory(ABC):
823 @abstractmethod
824 def create_data_set_provider(self,
825 inputs: pd.DataFrame,
826 outputs: pd.DataFrame,
827 model: Union[TorchVectorRegressionModel, TorchVectorClassificationModel],
828 training_context: TrainingContext,
829 input_tensoriser: Optional[Tensoriser],
830 output_tensoriser: Optional[Tensoriser],
831 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider:
832 pass
835class TorchDataSetProviderFactoryClassificationDefault(TorchDataSetProviderFactory):
836 def __init__(self, tensorise_dynamically=False):
837 """
838 :param tensorise_dynamically: whether tensorisation shall take place on the fly whenever the provided data sets are iterated;
839 if False, tensorisation takes place once in a precomputation stage (tensors must jointly fit into memory)
840 """
841 self.tensoriseDynamically = tensorise_dynamically
843 def create_data_set_provider(self,
844 inputs: pd.DataFrame,
845 outputs: pd.DataFrame,
846 model: TorchVectorClassificationModel,
847 training_context: TrainingContext,
848 input_tensoriser: Optional[Tensoriser],
849 output_tensoriser: Optional[Tensoriser],
850 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider:
851 data_util = ClassificationVectorDataUtil(inputs, outputs, model.model.cuda, len(model._labels), # TODO FIXME
852 normalisation_mode=model.normalisationMode, input_tensoriser=input_tensoriser, output_tensoriser=output_tensoriser,
853 data_frame_splitter=data_frame_splitter)
854 return TorchDataSetProviderFromVectorDataUtil(data_util, model.model.cuda, tensorise_dynamically=self.tensoriseDynamically)
857class TorchDataSetProviderFactoryRegressionDefault(TorchDataSetProviderFactory):
858 def __init__(self, tensorise_dynamically=False):
859 """
860 :param tensorise_dynamically: whether tensorisation shall take place on the fly whenever the provided data sets are iterated;
861 if False, tensorisation takes place once in a precomputation stage (tensors must jointly fit into memory)
862 """
863 self.tensoriseDynamically = tensorise_dynamically
865 def create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame, model: TorchVectorRegressionModel,
866 training_context: TrainingContext, input_tensoriser: Optional[Tensoriser], output_tensoriser: Optional[Tensoriser],
867 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider:
868 data_util = VectorDataUtil(inputs, outputs, model.model.cuda, normalisation_mode=model.normalisationMode,
869 input_tensoriser=input_tensoriser, output_tensoriser=output_tensoriser, data_frame_splitter=data_frame_splitter)
870 return TorchDataSetProviderFromVectorDataUtil(data_util, model.model.cuda, tensorise_dynamically=self.tensoriseDynamically)
873class OutputTensorToArrayConverter(ABC):
874 @abstractmethod
875 def convert(self, model_output: torch.Tensor, model_input: Union[torch.Tensor, Sequence[torch.Tensor]]) -> np.ndarray:
876 """
877 :param model_output: the output tensor generated by the model
878 :param model_input: the input tensor(s) for which the model produced the output (which may provide relevant meta-data)
879 :return: a numpy array of shape (N, D) where N=output.shape[0] is the number of data points and D is the number of
880 variables predicted by the model
881 """
882 pass