Coverage for src/sensai/torch/torch_base.py: 70%
452 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
1import functools
2import io
3import logging
4import typing
5from abc import ABC, abstractmethod
6from typing import Union, Tuple, Callable, Optional, List, Sequence
8import numpy as np
9import pandas as pd
10import torch
11from torch import nn
12from torch.nn import functional as F
14from .torch_data import TensorScaler, VectorDataUtil, ClassificationVectorDataUtil, TorchDataSet, \
15 TorchDataSetProvider, Tensoriser, TorchDataSetFromDataFrames, RuleBasedTensoriser, \
16 TorchDataSetProviderFromVectorDataUtil
17from .torch_enums import ClassificationOutputMode
18from .torch_opt import NNOptimiser, NNLossEvaluatorRegression, NNLossEvaluatorClassification, NNOptimiserParams, TrainingInfo
19from ..data import DataFrameSplitter
20from ..normalisation import NormalisationMode
21from ..util.dtype import to_float_array
22from ..util.pickle import setstate
23from ..util.string import ToStringMixin
24from ..vector_model import VectorRegressionModel, VectorClassificationModel, TrainingContext
26log: logging.Logger = logging.getLogger(__name__)
29class MCDropoutCapableNNModule(nn.Module, ABC):
30 """
31 Base class for NN modules that are to support MC-Dropout.
32 Support can be added by applying the _dropout function in the module's forward method.
33 Then, to apply inference that samples results, call inferMCDropout rather than just using __call__.
34 """
36 def __init__(self) -> None:
37 super().__init__()
38 self._applyMCDropout = False
39 self._pMCDropoutOverride = None
41 def __setstate__(self, d: dict) -> None:
42 if "_applyMCDropout" not in d:
43 d["_applyMCDropout"] = False
44 if "_pMCDropoutOverride" not in d:
45 d["_pMCDropoutOverride"] = None
46 super().__setstate__(d)
48 def _dropout(self, x: torch.Tensor, p_training=None, p_inference=None) -> torch.Tensor:
49 """
50 This method is to to applied within the module's forward method to apply dropouts during training and/or inference.
52 :param x: the model input tensor
53 :param p_training: the probability with which to apply dropouts during training; if None, apply no dropout
54 :param p_inference: the probability with which to apply dropouts during MC-Dropout-based inference (via inferMCDropout,
55 which may override the probability via its optional argument);
56 if None, a dropout is not to be applied
57 :return: a potentially modified version of x with some elements dropped out, depending on application context and dropout
58 probabilities
59 """
60 if self.training and p_training is not None:
61 return F.dropout(x, p_training)
62 elif not self.training and self._applyMCDropout and p_inference is not None:
63 return F.dropout(x, p_inference if self._pMCDropoutOverride is None else self._pMCDropoutOverride)
64 else:
65 return x
67 def _enable_mc_dropout(self, enabled=True, p_mc_dropout_override=None) -> None:
68 self._applyMCDropout = enabled
69 self._pMCDropoutOverride = p_mc_dropout_override
71 def infer_mc_dropout(self, x: Union[torch.Tensor, Sequence[torch.Tensor]], num_samples, p=None) -> Tuple[torch.Tensor, torch.Tensor]:
72 """
73 Applies inference using MC-Dropout, drawing the given number of samples.
75 :param x: the model input (a tensor or tuple/list of tensors)
76 :param num_samples: the number of samples to draw with MC-Dropout
77 :param p: the dropout probability to apply, overriding the probability specified by the model's forward method; if None, use model's
78 default
79 :return: a pair (y, sd) where y the mean output tensor and sd is a tensor of the same dimension containing standard deviations
80 """
81 if type(x) not in (tuple, list):
82 x = [x]
83 results = []
84 self._enable_mc_dropout(True, p_mc_dropout_override=p)
85 try:
86 for i in range(num_samples):
87 y = self(*x)
88 results.append(y)
89 finally:
90 self._enable_mc_dropout(False)
91 results = torch.stack(results)
92 mean = torch.mean(results, 0)
93 stddev = torch.std(results, 0, unbiased=False)
94 return mean, stddev
97class TorchModel(ABC, ToStringMixin):
98 """
99 sensAI abstraction for torch models, which supports one-line training, allows for convenient model application,
100 has basic mechanisms for data scaling, and soundly handles persistence (via pickle).
101 An instance wraps a torch.nn.Module, which is constructed on demand during training via the factory method
102 createTorchModule.
103 """
104 log: logging.Logger = log.getChild(__qualname__)
106 def __init__(self, cuda=True) -> None:
107 self.cuda: bool = cuda
108 self.module: Optional[torch.nn.Module] = None
109 self.outputScaler: Optional[TensorScaler] = None
110 self.inputScaler: Optional[TensorScaler] = None
111 self.trainingInfo: Optional[TrainingInfo] = None
112 self._gpu: Optional[int] = None
113 self._normalisationCheckThreshold: Optional[int] = 5
115 def _tostring_exclude_private(self) -> bool:
116 return True
118 def set_torch_module(self, module: torch.nn.Module) -> None:
119 self.module = module
121 def set_normalisation_check_threshold(self, threshold: Optional[float]):
122 self._normalisationCheckThreshold = threshold
124 def get_module_bytes(self) -> bytes:
125 bytes_io = io.BytesIO()
126 torch.save(self.module, bytes_io)
127 return bytes_io.getvalue()
129 def set_module_bytes(self, model_bytes: bytes) -> None:
130 model_file = io.BytesIO(model_bytes)
131 self._load_model(model_file)
133 def get_torch_module(self) -> torch.nn.Module:
134 return self.module
136 def _set_cuda_enabled(self, is_cuda_enabled: bool) -> None:
137 self.cuda = is_cuda_enabled
139 def _is_cuda_enabled(self) -> bool:
140 return self.cuda
142 def _load_model(self, model_file) -> None: # TODO: complete type hints: what types are allowed for modelFile?
143 try:
144 self.module = torch.load(model_file)
145 self._gpu = self._get_gpu_from_model_parameter_device()
146 except:
147 if self._is_cuda_enabled():
148 if torch.cuda.device_count() > 0:
149 new_device = "cuda:0"
150 else:
151 new_device = "cpu"
152 self.log.warning(f"Loading of CUDA model failed, trying to map model to device {new_device}...")
153 if type(model_file) != str:
154 model_file.seek(0)
155 try:
156 self.module = torch.load(model_file, map_location=new_device)
157 except:
158 self.log.warning(f"Failure to map model to device {new_device}, trying CPU...")
159 if new_device != "cpu":
160 new_device = "cpu"
161 self.module = torch.load(model_file, map_location=new_device)
162 if new_device == "cpu":
163 self._set_cuda_enabled(False)
164 self._gpu = None
165 else:
166 self._gpu = 0
167 self.log.info(f"Model successfully loaded to {new_device}")
168 else:
169 raise
171 @abstractmethod
172 def create_torch_module(self) -> torch.nn.Module:
173 pass
175 def __getstate__(self) -> dict:
176 state = dict(self.__dict__)
177 del state["module"]
178 state["modelBytes"] = self.get_module_bytes()
179 return state
181 def __setstate__(self, d: dict) -> None:
182 # backward compatibility
183 if "bestEpoch" in d:
184 d["trainingInfo"] = TrainingInfo(best_epoch=d["bestEpoch"])
185 del d["bestEpoch"]
186 new_default_properties = {"_normalisationCheckThreshold": 5}
188 model_bytes = None
189 if "modelBytes" in d:
190 model_bytes = d["modelBytes"]
191 del d["modelBytes"]
192 setstate(TorchModel, self, d, new_default_properties=new_default_properties)
193 if model_bytes is not None:
194 self.set_module_bytes(model_bytes)
196 def apply(self,
197 x: Union[torch.Tensor, np.ndarray, TorchDataSet, Sequence[torch.Tensor]],
198 as_numpy: bool = True, create_batch: bool = False,
199 mc_dropout_samples: Optional[int] = None,
200 mc_dropout_probability: Optional[float] = None,
201 scale_output: bool = False,
202 scale_input: bool = False) -> Union[torch.Tensor, np.ndarray, Tuple]:
203 """
204 Applies the model to the given input tensor and returns the result
206 :param x: the input tensor (either a batch or, if createBatch=True, a single data point), a data set or a tuple/list of tensors
207 (if the model accepts more than one input).
208 If it is a data set, it will be processed at once, so the data set must not be too large to be processed at once.
209 :param as_numpy: flag indicating whether to convert the result to a numpy.array (if False, return tensor)
210 :param create_batch: whether to add an additional tensor dimension for a batch containing just one data point
211 :param mc_dropout_samples: if not None, apply MC-Dropout-based inference with the respective number of samples; if None, apply
212 regular inference
213 :param mc_dropout_probability: the probability with which to apply dropouts in MC-Dropout-based inference; if None, use model's
214 default
215 :param scale_output: whether to scale the output that is produced by the underlying model (using this instance's output scaler,
216 if any)
217 :param scale_input: whether to scale the input (using this instance's input scaler, if any) before applying the underlying model
219 :return: an output tensor or, if MC-Dropout is applied, a pair (y, sd) where y the mean output tensor and sd is a tensor of the
220 same dimension containing standard deviations
221 """
222 def extract(z):
223 if scale_output:
224 z = self.scaled_output(z)
225 if self._is_cuda_enabled():
226 z = z.cpu()
227 z = z.detach()
228 if as_numpy:
229 z = z.numpy()
230 return z
232 model = self.get_torch_module()
233 model.eval()
235 if isinstance(x, TorchDataSet):
236 x = next(x.iter_batches(x.size(), input_only=True, shuffle=False))
237 elif isinstance(x, np.ndarray):
238 x = to_float_array(x)
239 x = torch.from_numpy(x).float()
241 if type(x) not in (list, tuple):
242 inputs = [x]
243 else:
244 inputs = x
246 if self._is_cuda_enabled():
247 torch.cuda.set_device(self._gpu)
248 inputs = [t.cuda() for t in inputs]
249 if scale_input:
250 inputs = [self.inputScaler.normalise(t) for t in inputs]
251 if create_batch:
252 inputs = [t.view(1, *x.size()) for t in inputs]
254 # check input normalisation
255 if self._normalisationCheckThreshold is not None:
256 for i, t in enumerate(inputs):
257 if t.is_floating_point() and t.numel() > 0: # skip any integer tensors (which typically contain lengths) and empty tensors
258 max_value = t.abs().max().item()
259 if max_value > self._normalisationCheckThreshold:
260 log.warning(f"Received value in input tensor {i} which is likely to not be correctly normalised: "
261 f"maximum abs. value in tensor is %f" % max_value)
262 if mc_dropout_samples is None:
263 y = model(*inputs)
264 return extract(y)
265 else:
266 y, stddev = model.inferMCDropout(x, mc_dropout_samples, p=mc_dropout_probability)
267 return extract(y), extract(stddev)
269 def apply_scaled(self, x: Union[torch.Tensor, np.ndarray, TorchDataSet, Sequence[torch.Tensor]],
270 as_numpy: bool = True,
271 create_batch: bool = False,
272 mc_dropout_samples: Optional[int] = None,
273 mc_dropout_probability: Optional[float] = None) \
274 -> Union[torch.Tensor, np.ndarray]:
275 """
276 applies the model to the given input tensor and returns the scaled result (i.e. in the original scale)
278 :param x: the input tensor(s) or data set
279 :param as_numpy: flag indicating whether to convert the result to a numpy.array (if False, return tensor)
280 :param create_batch: whether to add an additional tensor dimension for a batch containing just one data point
281 :param mc_dropout_samples: if not None, apply MC-Dropout-based inference with the respective number of samples; if None, apply
282 regular inference
283 :param mc_dropout_probability: the probability with which to apply dropouts in MC-Dropout-based inference; if None, use model's
284 default
286 :return: a scaled output tensor or, if MC-Dropout is applied, a pair (y, sd) of scaled tensors, where
287 y the mean output tensor and sd is a tensor of the same dimension containing standard deviations
288 """
289 return self.apply(x, scale_output=True, scale_input=True, as_numpy=as_numpy, create_batch=create_batch,
290 mc_dropout_samples=mc_dropout_samples, mc_dropout_probability=mc_dropout_probability)
292 def scaled_output(self, output: torch.Tensor) -> torch.Tensor:
293 return self.outputScaler.denormalise(output)
295 def _extract_params_from_data(self, data: TorchDataSetProvider) -> None:
296 self.outputScaler = data.get_output_tensor_scaler()
297 self.inputScaler = data.get_input_tensor_scaler()
299 def fit(self, data: TorchDataSetProvider, nn_optimiser_params: NNOptimiserParams, strategy: "TorchModelFittingStrategy" = None) \
300 -> None:
301 """
302 Fits this model using the given model and strategy
304 :param data: a provider for the data with which to fit the model
305 :param strategy: the fitting strategy; if None, use TorchModelFittingStrategyDefault.
306 Pass your own strategy to perform custom fitting processes, e.g. process which involve multi-stage learning
307 :param nn_optimiser_params: the parameters with which to create an optimiser which can be applied in the fitting strategy
308 """
309 self._extract_params_from_data(data)
310 optimiser = NNOptimiser(nn_optimiser_params)
311 if strategy is None:
312 strategy = TorchModelFittingStrategyDefault()
313 self.trainingInfo = strategy.fit(self, data, optimiser)
314 self._gpu = self._get_gpu_from_model_parameter_device()
316 def _get_gpu_from_model_parameter_device(self) -> Optional[int]:
317 try:
318 return next(self.module.parameters()).get_device()
319 except:
320 return None
322 @property
323 def best_epoch(self) -> Optional[int]:
324 return self.trainingInfo.best_epoch if self.trainingInfo is not None else None
326 @property
327 def total_epochs(self) -> Optional[int]:
328 return self.trainingInfo.total_epochs if self.trainingInfo is not None else None
330 def _tostring_excludes(self) -> List[str]:
331 return ['_gpu', 'module', 'trainingInfo', "inputScaler", "outputScaler"]
333 def _tostring_additional_entries(self):
334 return dict(bestEpoch=self.best_epoch, totalEpochs=self.total_epochs)
337class TorchModelFittingStrategy(ABC):
338 """
339 Defines the interface for fitting strategies that can be used in TorchModel.fit
340 """
341 @abstractmethod
342 def fit(self, model: TorchModel, data: TorchDataSetProvider, nn_optimiser: NNOptimiser) -> Optional[TrainingInfo]:
343 pass
346class TorchModelFittingStrategyDefault(TorchModelFittingStrategy):
347 """
348 Represents the default fitting strategy, which simply applies the given optimiser to the model and data
349 """
350 def fit(self, model: TorchModel, data: TorchDataSetProvider, nn_optimiser: NNOptimiser) -> Optional[TrainingInfo]:
351 return nn_optimiser.fit(model, data)
354class TorchModelFromModuleFactory(TorchModel):
355 def __init__(self, module_factory: Callable[..., torch.nn.Module], *args, cuda: bool = True, **kwargs) -> None:
356 super().__init__(cuda)
357 self.args = args
358 self.kwargs = kwargs
359 self.moduleFactory = module_factory
361 def create_torch_module(self) -> torch.nn.Module:
362 return self.moduleFactory(*self.args, **self.kwargs)
365class TorchModelFromModule(TorchModel):
366 def __init__(self, module: torch.nn.Module, cuda: bool = True):
367 super().__init__(cuda=cuda)
368 self.module = module
370 def create_torch_module(self) -> torch.nn.Module:
371 return self.module
374class TorchModelFactoryFromModule:
375 """Represents a factory for the creation of a TorchModel based on a torch module"""
376 def __init__(self, module: torch.nn.Module, cuda: bool = True):
377 self.module = module
378 self.cuda = cuda
380 def __call__(self) -> TorchModel:
381 return TorchModelFromModule(self.module, self.cuda)
384class VectorTorchModel(TorchModel, ABC):
385 """
386 Base class for TorchModels that can be used within VectorModels, where the input and output dimensions
387 are determined by the data
388 """
389 def __init__(self, cuda: bool = True) -> None:
390 super().__init__(cuda=cuda)
391 self.inputDim = None
392 self.outputDim = None
394 def _extract_params_from_data(self, data: TorchDataSetProvider) -> None:
395 super()._extract_params_from_data(data)
396 self.inputDim = data.get_input_dim()
397 self.outputDim = data.get_model_output_dim()
399 def create_torch_module(self) -> torch.nn.Module:
400 return self.create_torch_module_for_dims(self.inputDim, self.outputDim)
402 @abstractmethod
403 def create_torch_module_for_dims(self, input_dim: int, output_dim: int) -> torch.nn.Module:
404 """
405 :param input_dim: the number of input dimensions as reported by the data set provider (number of columns
406 in input data frame for default providers)
407 :param output_dim: the number of output dimensions as reported by the data set provider (for default providers,
408 this will be the number of columns in the output data frame or, for classification, the number of classes)
409 :return: the torch module
410 """
411 pass
414class TorchAutoregressiveResultHandler(ABC):
415 """
416 Supports the saving of predictions results such that subsequent predictions
417 can build on earlier predictions, thus supporting autoregressive models.
418 """
420 @abstractmethod
421 def clear_results(self):
422 pass
424 @abstractmethod
425 def save_results(self, input_df: pd.DataFrame, results: np.ndarray) -> None:
426 """
427 Saves the regression results such that they can be used as input for subsequent prediction steps.
428 The input will typically be processed by a feature generator or vectoriser, so the result
429 should be stored in a place from which the respective feature generator or vectoriser can retrieve it.
431 :param input_df: the input data frame for which results were obtained (number of rows corresponds to
432 length of `results`)
433 :param results: the results array, which is typically a 2D array where `results[i]` is an array
434 containing the results for the i-th input row
435 """
436 pass
439TTorchVectorRegressionModel = typing.TypeVar("TTorchVectorRegressionModel", bound="TorchVectorRegressionModel")
442class TorchVectorRegressionModel(VectorRegressionModel):
443 """
444 Base class for the implementation of VectorRegressionModels based on TorchModels.
445 An instance of this class will have an instance of TorchModel as the underlying model.
446 """
448 def __init__(self, torch_model_factory: Callable[[], TorchModel],
449 normalisation_mode: NormalisationMode = NormalisationMode.NONE,
450 nn_optimiser_params: Union[dict, NNOptimiserParams, None] = None) -> None:
451 """
452 :param torch_model_factory: the factory function with which to create the contained TorchModel instance that the instance is to
453 encapsulate. For the instance to be picklable, this cannot be a lambda or locally defined function.
454 :param normalisation_mode: the normalisation mode to apply to input data frames
455 :param nn_optimiser_params: the parameters to apply in NNOptimiser during training
456 """
457 super().__init__()
459 if nn_optimiser_params is None:
460 nn_optimiser_params_instance = NNOptimiserParams()
461 else:
462 nn_optimiser_params_instance = NNOptimiserParams.from_dict_or_instance(nn_optimiser_params)
463 if nn_optimiser_params_instance.loss_evaluator is None:
464 nn_optimiser_params_instance.loss_evaluator = NNLossEvaluatorRegression(NNLossEvaluatorRegression.LossFunction.MSELOSS)
466 self.torch_model_factory = torch_model_factory
467 self.normalisationMode = normalisation_mode
468 self.nnOptimiserParams = nn_optimiser_params_instance
469 self.model: Optional[TorchModel] = None
470 self.inputTensoriser: Optional[Tensoriser] = None
471 self.outputTensoriser: Optional[Tensoriser] = None
472 self.outputTensorToArrayConverter: Optional[OutputTensorToArrayConverter] = None
473 self.torchDataSetProviderFactory: Optional[TorchDataSetProviderFactory] = None
474 self.dataFrameSplitter: Optional[DataFrameSplitter] = None
475 self._normalisationCheckThreshold = 5
476 self.inferenceBatchSize: Optional[int] = None
477 self.autoregressiveResultHandler: Optional[TorchAutoregressiveResultHandler] = None
479 def __setstate__(self, state) -> None:
480 if "modelClass" in state: # old-style factory
481 state["torch_model_factory"] = functools.partial(state["modelClass"], *state["modelArgs"], **state["modelKwArgs"])
482 for k in ("modelClass", "modelArgs", "modelKwArgs"):
483 del state[k]
484 state["nnOptimiserParams"] = NNOptimiserParams.from_dict_or_instance(state["nnOptimiserParams"])
485 new_optional_members = ["inputTensoriser", "torchDataSetProviderFactory", "dataFrameSplitter", "outputTensoriser",
486 "outputTensorToArrayConverter", "inferenceBatchSize", "autoRegressiveResultHandler"]
487 new_default_properties = {"_normalisationCheckThreshold": 5}
488 setstate(TorchVectorRegressionModel, self, state, new_optional_properties=new_optional_members,
489 new_default_properties=new_default_properties)
491 @classmethod
492 def from_module(cls, module: torch.nn.Module, cuda=True, normalisation_mode: NormalisationMode = NormalisationMode.NONE,
493 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> "TorchVectorRegressionModel":
494 return cls(TorchModelFactoryFromModule(module=module, cuda=cuda), normalisation_mode=normalisation_mode,
495 nn_optimiser_params=nn_optimiser_params)
497 def _tostring_excludes(self) -> List[str]:
498 excludes = super()._tostring_excludes()
499 if self.model is not None:
500 return excludes + ["modelClass", "modelArgs", "modelKwArgs"]
501 else:
502 return excludes
504 def with_input_tensoriser(self: TTorchVectorRegressionModel, tensoriser: Tensoriser) -> TTorchVectorRegressionModel:
505 """
506 :param tensoriser: tensoriser to use in order to convert input data frames to (one or more) tensors.
507 The default tensoriser directly converts the data frame's values (which is assumed to contain only scalars that
508 can be coerced to floats) to a float tensor.
509 The use of a custom tensoriser is necessary if a non-trivial conversion is necessary or if the data frame
510 is to be converted to more than one input tensor.
511 :return: self
512 """
513 self.inputTensoriser = tensoriser
514 return self
516 def with_output_tensoriser(self: TTorchVectorRegressionModel, tensoriser: RuleBasedTensoriser) -> TTorchVectorRegressionModel:
517 """
518 :param tensoriser: tensoriser to use in order to convert the output data frame to a tensor.
519 The default output tensoriser directly converts the data frame's values to a float tensor.
521 NOTE: It is required to be a rule-based tensoriser, because mechanisms that require fitting on the data
522 and thus perform a data-dependendent conversion are likely to cause problems because they would need
523 to be reversed at inference time (since the model will be trained on the converted values). If you require
524 a transformation, use a target transformer, which will be applied before the tensoriser.
525 :return: self
526 """
527 self.outputTensoriser = tensoriser
528 return self
530 def with_output_tensor_to_array_converter(self: TTorchVectorRegressionModel,
531 output_tensor_to_array_converter: "OutputTensorToArrayConverter") -> TTorchVectorRegressionModel:
532 """
533 Configures the use of a custom converter from tensors to numpy arrays, which is applied during inference.
534 A custom converter can be required, for example, to handle variable-length outputs (where the output tensor
535 will typically contain unwanted padding). Note that since the converter is for inference only, it may be
536 required to use a custom loss evaluator during training if the use of a custom converter is necessary.
538 :param output_tensor_to_array_converter: the converter
539 :return: self
540 """
541 self.outputTensorToArrayConverter = output_tensor_to_array_converter
542 return self
544 def with_torch_data_set_provider_factory(self: TTorchVectorRegressionModel,
545 torch_data_set_provider_factory: "TorchDataSetProviderFactory") -> TTorchVectorRegressionModel:
546 """
547 :param torch_data_set_provider_factory: the torch data set provider factory, which is used to instantiate the provider which
548 will provide the training and validation data sets from the input data frame that is passed in for learning.
549 By default, TorchDataSetProviderFactoryRegressionDefault is used.
550 :return: self
551 """
552 self.torchDataSetProviderFactory = torch_data_set_provider_factory
553 return self
555 def with_data_frame_splitter(self: TTorchVectorRegressionModel, data_frame_splitter: DataFrameSplitter) -> TTorchVectorRegressionModel:
556 """
557 :param data_frame_splitter: the data frame splitter which is used to split the input/output data frames that are passed for
558 learning into a data frame that is used for training and a data frame that is used for validation.
559 The input data frame is the data frame that is passed as input to the splitter, and the returned indices
560 are used to split both the input and output data frames in the same way.
561 :return: self
562 """
563 self.dataFrameSplitter = data_frame_splitter
564 return self
566 def with_normalisation_check_threshold(self: TTorchVectorRegressionModel, threshold: Optional[float]) -> TTorchVectorRegressionModel:
567 """
568 Defines a threshold with which to check inputs that are passed to the underlying neural network.
569 Whenever an (absolute) input value exceeds the threshold, a warning is triggered.
571 :param threshold: the threshold
572 :return: self
573 """
574 self._normalisationCheckThreshold = threshold
575 return self
577 def with_autoregressive_result_handler(self: TTorchVectorRegressionModel,
578 result_handler: TorchAutoregressiveResultHandler,
579 inference_batch_size=1) -> TTorchVectorRegressionModel:
580 """
581 Adds a result handler which can be used to store prediction results such that subsequent predictions
582 can use the prediction result, supporting autoregressive models.
583 The autoregressive predictions are assumed to be handled in a single call to method :meth:`predict`,
584 and the results will be stored for the duration of the call.
585 For autoregressive predictions that build on earlier predictions, we must typically restrict
586 the batch size such that predictions from the earlier batch can be saved and correctly reused
587 as input for the subsequent predictions. The models input preprocessors (such as feature generators
588 or vectorisers) must make ensure that the results being stored by the result handler are appropriately
589 used as input.
591 :param result_handler: the result handler
592 :param inference_batch_size: the batch size to use for predictions
593 :return: self
594 """
595 self.autoregressiveResultHandler = result_handler
596 self.inferenceBatchSize = inference_batch_size
597 return self
599 def _create_torch_model(self) -> TorchModel:
600 torch_model = self.torch_model_factory()
601 torch_model.set_normalisation_check_threshold(self._normalisationCheckThreshold)
602 return torch_model
604 def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> TorchDataSetProvider:
605 factory = self.torchDataSetProviderFactory
606 if factory is None:
607 factory = TorchDataSetProviderFactoryRegressionDefault()
608 return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser,
609 output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter)
611 def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> None:
612 if self.inputTensoriser is not None:
613 log.info(f"Fitting {self.inputTensoriser} ...")
614 self.inputTensoriser.fit(inputs, model=self)
615 self.model = self._create_torch_model()
616 data_set_provider = self._create_data_set_provider(inputs, outputs)
617 self.model.fit(data_set_provider, self.nnOptimiserParams)
619 def _predict_outputs_for_input_data_frame(self, inputs: pd.DataFrame) -> np.ndarray:
620 tensorise_dynamically = False
621 if self.autoregressiveResultHandler is not None:
622 self.autoregressiveResultHandler.clear_results()
623 tensorise_dynamically = True # must be dynamically tensorised to allow inclusion of predicted results
624 batch_size = self.nnOptimiserParams.batch_size if self.inferenceBatchSize is None else self.inferenceBatchSize
625 results: List[np.ndarray] = []
626 data_set = TorchDataSetFromDataFrames(inputs, None, self.model.cuda, input_tensoriser=self.inputTensoriser,
627 tensorise_dynamically=tensorise_dynamically)
628 start_idx = 0
629 for input_batch in data_set.iter_batches(batch_size, input_only=True):
630 if self.outputTensorToArrayConverter is None:
631 result = self.model.apply_scaled(input_batch, as_numpy=True)
632 else:
633 output_batch = self.model.apply_scaled(input_batch, as_numpy=False)
634 result = self.outputTensorToArrayConverter.convert(output_batch, input_batch)
635 if self.autoregressiveResultHandler is not None:
636 self.autoregressiveResultHandler.save_results(inputs.iloc[start_idx:start_idx+len(result)], result)
637 start_idx += len(result)
638 results.append(result)
639 if self.autoregressiveResultHandler is not None:
640 self.autoregressiveResultHandler.clear_results()
641 return np.concatenate(results)
643 def _predict(self, inputs: pd.DataFrame) -> pd.DataFrame:
644 y_array = self._predict_outputs_for_input_data_frame(inputs)
645 return pd.DataFrame(y_array, columns=self.get_model_output_variable_names())
648TTorchVectorClassificationModel = typing.TypeVar("TTorchVectorClassificationModel", bound="TorchVectorClassificationModel")
651class TorchVectorClassificationModel(VectorClassificationModel):
652 """
653 Base class for the implementation of VectorClassificationModels based on TorchModels.
654 An instance of this class will have an instance of TorchModel as the underlying model.
655 """
656 def __init__(self, output_mode: ClassificationOutputMode,
657 torch_model_factory: Callable[[], TorchModel],
658 normalisation_mode: NormalisationMode = NormalisationMode.NONE,
659 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> None:
660 """
661 :param output_mode: specifies the nature of the output of the underlying neural network model
662 :param torch_model_factory: the factory function with which to create the contained TorchModel instance that the instance is to
663 encapsulate. For the instance to be picklable, this cannot be a lambda or locally defined function.
664 :param normalisation_mode: the normalisation mode to apply to input data frames
665 :param nn_optimiser_params: the parameters to apply in NNOptimiser during training
666 """
667 super().__init__()
669 if nn_optimiser_params is None:
670 nn_optimiser_params = NNOptimiserParams()
671 if nn_optimiser_params.loss_evaluator is None:
672 loss_function = NNLossEvaluatorClassification.LossFunction.default_for_output_mode(output_mode)
673 nn_optimiser_params.loss_evaluator = NNLossEvaluatorClassification(loss_function)
675 self.outputMode = output_mode
676 self.torch_model_factory = torch_model_factory
677 self.normalisationMode = normalisation_mode
678 self.nnOptimiserParams: NNOptimiserParams = nn_optimiser_params
679 self.model: Optional[TorchModel] = None
680 self.inputTensoriser: Optional[Tensoriser] = None
681 self.outputTensoriser: Optional[Tensoriser] = None
682 self.torchDataSetProviderFactory: Optional[TorchDataSetProviderFactory] = None
683 self.dataFrameSplitter: Optional[DataFrameSplitter] = None
684 self._normalisationCheckThreshold = 5
686 # noinspection DuplicatedCode
687 def __setstate__(self, state) -> None:
688 if "modelClass" in state: # old-style factory
689 state["torch_model_factory"] = functools.partial(state["modelClass"], *state["modelArgs"], **state["modelKwArgs"])
690 for k in ("modelClass", "modelArgs", "modelKwArgs"):
691 del state[k]
692 state["nnOptimiserParams"] = NNOptimiserParams.from_dict_or_instance(state["nnOptimiserParams"])
693 new_optional_members = ["inputTensoriser", "torchDataSetProviderFactory", "dataFrameSplitter", "outputTensoriser"]
694 new_default_properties = {"outputMode": ClassificationOutputMode.PROBABILITIES, "_normalisationCheckThreshold": 5}
695 setstate(TorchVectorClassificationModel, self, state, new_optional_properties=new_optional_members,
696 new_default_properties=new_default_properties)
698 @classmethod
699 def from_module(cls, module: torch.nn.Module, output_mode: ClassificationOutputMode, cuda=True,
700 normalisation_mode: NormalisationMode = NormalisationMode.NONE,
701 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> "TorchVectorClassificationModel":
702 return cls(output_mode, TorchModelFactoryFromModule(module, cuda=cuda),
703 normalisation_mode=normalisation_mode, nn_optimiser_params=nn_optimiser_params)
705 def with_input_tensoriser(self: TTorchVectorClassificationModel, tensoriser: Tensoriser) -> TTorchVectorClassificationModel:
706 """
707 :param tensoriser: tensoriser to use in order to convert input data frames to (one or more) tensors.
708 The default tensoriser directly converts the data frame's values (which is assumed to contain only scalars that
709 can be coerced to floats) to a float tensor.
710 The use of a custom tensoriser is necessary if a non-trivial conversion is necessary or if the data frame
711 is to be converted to more than one input tensor.
712 :return: self
713 """
714 self.inputTensoriser = tensoriser
715 return self
717 def with_output_tensoriser(self: TTorchVectorClassificationModel, tensoriser: RuleBasedTensoriser) -> TTorchVectorClassificationModel:
718 """
719 :param tensoriser: tensoriser to use in order to convert the output data frame to a tensor.
720 NOTE: It is required to be a rule-based tensoriser, because mechanisms that require fitting on the data
721 and thus perform a data-dependendent conversion are likely to cause problems because they would need
722 to be reversed at inference time (since the model will be trained on the converted values). If you require
723 a transformation, use a target transformer, which will be applied before the tensoriser.
724 """
725 self.outputTensoriser = tensoriser
726 return self
728 def with_torch_data_set_provider_factory(self: TTorchVectorClassificationModel,
729 torch_data_set_provider_factory: "TorchDataSetProviderFactory") -> TTorchVectorClassificationModel:
730 """
731 :param torch_data_set_provider_factory: the torch data set provider factory, which is used to instantiate the provider which
732 will provide the training and validation data sets from the input data frame that is passed in for learning.
733 By default, TorchDataSetProviderFactoryClassificationDefault is used.
734 :return: self
735 """
736 self.torchDataSetProviderFactory = torch_data_set_provider_factory
737 return self
739 def with_data_frame_splitter(self: TTorchVectorClassificationModel, data_frame_splitter: DataFrameSplitter) \
740 -> TTorchVectorClassificationModel:
741 """
742 :param data_frame_splitter: the data frame splitter which is used to split the input/output data frames that are passed for
743 learning into a data frame that is used for training and a data frame that is used for validation.
744 The input data frame is the data frame that is passed as input to the splitter, and the returned indices
745 are used to split both the input and output data frames in the same way.
746 :return: self
747 """
748 self.dataFrameSplitter = data_frame_splitter
749 return self
751 def with_normalisation_check_threshold(self: TTorchVectorClassificationModel, threshold: Optional[float]) \
752 -> TTorchVectorClassificationModel:
753 """
754 Defines a threshold with which to check inputs that are passed to the underlying neural network.
755 Whenever an (absolute) input value exceeds the threshold, a warning is triggered.
757 :param threshold: the threshold
758 :return: self
759 """
760 self._normalisationCheckThreshold = threshold
761 return self
763 def _create_torch_model(self) -> TorchModel:
764 torch_model = self.torch_model_factory()
765 torch_model.set_normalisation_check_threshold(self._normalisationCheckThreshold)
766 return torch_model
768 def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> TorchDataSetProvider:
769 factory = self.torchDataSetProviderFactory
770 if factory is None:
771 factory = TorchDataSetProviderFactoryClassificationDefault()
772 return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser,
773 output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter)
775 def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> None:
776 if len(outputs.columns) != 1:
777 raise ValueError("Expected one output dimension: the class labels")
779 if self.inputTensoriser is not None:
780 log.info(f"Fitting {self.inputTensoriser} ...")
781 self.inputTensoriser.fit(inputs, model=self)
783 # transform outputs: for each data point, the new output shall be the index in the list of labels
784 labels: pd.Series = outputs.iloc[:, 0]
785 outputs = pd.DataFrame([self._labels.index(l) for l in labels], columns=outputs.columns, index=outputs.index)
787 self.model = self._create_torch_model()
789 data_set_provider = self._create_data_set_provider(inputs, outputs)
790 self.model.fit(data_set_provider, self.nnOptimiserParams)
792 def _predict_outputs_for_input_data_frame(self, inputs: pd.DataFrame) -> torch.Tensor:
793 batch_size = self.nnOptimiserParams.batch_size
794 results = []
795 data_set = TorchDataSetFromDataFrames(inputs, None, self.model.cuda, input_tensoriser=self.inputTensoriser)
796 for inputBatch in data_set.iter_batches(batch_size, input_only=True):
797 results.append(self.model.apply_scaled(inputBatch, as_numpy=False))
798 return torch.cat(results, dim=0)
800 def _predict_class_probabilities(self, inputs: pd.DataFrame) -> pd.DataFrame:
801 y = self._predict_outputs_for_input_data_frame(inputs)
802 if self.outputMode == ClassificationOutputMode.PROBABILITIES:
803 pass
804 elif self.outputMode == ClassificationOutputMode.LOG_PROBABILITIES:
805 y = y.exp()
806 elif self.outputMode == ClassificationOutputMode.UNNORMALISED_LOG_PROBABILITIES:
807 y = y.softmax(dim=1)
808 else:
809 raise ValueError(f"Unhandled output mode {self.outputMode}")
810 return pd.DataFrame(y.numpy(), columns=self._labels)
812 def _tostring_excludes(self) -> List[str]:
813 excludes = super()._tostring_excludes()
814 if self.model is not None:
815 return excludes + ["modelClass", "modelArgs", "modelKwArgs"]
816 else:
817 return excludes
820class TorchDataSetProviderFactory(ABC):
821 @abstractmethod
822 def create_data_set_provider(self,
823 inputs: pd.DataFrame,
824 outputs: pd.DataFrame,
825 model: Union[TorchVectorRegressionModel, TorchVectorClassificationModel],
826 training_context: TrainingContext,
827 input_tensoriser: Optional[Tensoriser],
828 output_tensoriser: Optional[Tensoriser],
829 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider:
830 pass
833class TorchDataSetProviderFactoryClassificationDefault(TorchDataSetProviderFactory):
834 def __init__(self, tensorise_dynamically=False):
835 """
836 :param tensorise_dynamically: whether tensorisation shall take place on the fly whenever the provided data sets are iterated;
837 if False, tensorisation takes place once in a precomputation stage (tensors must jointly fit into memory)
838 """
839 self.tensoriseDynamically = tensorise_dynamically
841 def create_data_set_provider(self,
842 inputs: pd.DataFrame,
843 outputs: pd.DataFrame,
844 model: TorchVectorClassificationModel,
845 training_context: TrainingContext,
846 input_tensoriser: Optional[Tensoriser],
847 output_tensoriser: Optional[Tensoriser],
848 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider:
849 data_util = ClassificationVectorDataUtil(inputs, outputs, model.model.cuda, len(model._labels), # TODO FIXME
850 normalisation_mode=model.normalisationMode, input_tensoriser=input_tensoriser, output_tensoriser=output_tensoriser,
851 data_frame_splitter=data_frame_splitter)
852 return TorchDataSetProviderFromVectorDataUtil(data_util, model.model.cuda, tensorise_dynamically=self.tensoriseDynamically)
855class TorchDataSetProviderFactoryRegressionDefault(TorchDataSetProviderFactory):
856 def __init__(self, tensorise_dynamically=False):
857 """
858 :param tensorise_dynamically: whether tensorisation shall take place on the fly whenever the provided data sets are iterated;
859 if False, tensorisation takes place once in a precomputation stage (tensors must jointly fit into memory)
860 """
861 self.tensoriseDynamically = tensorise_dynamically
863 def create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame, model: TorchVectorRegressionModel,
864 training_context: TrainingContext, input_tensoriser: Optional[Tensoriser], output_tensoriser: Optional[Tensoriser],
865 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider:
866 data_util = VectorDataUtil(inputs, outputs, model.model.cuda, normalisation_mode=model.normalisationMode,
867 input_tensoriser=input_tensoriser, output_tensoriser=output_tensoriser, data_frame_splitter=data_frame_splitter)
868 return TorchDataSetProviderFromVectorDataUtil(data_util, model.model.cuda, tensorise_dynamically=self.tensoriseDynamically)
871class OutputTensorToArrayConverter(ABC):
872 @abstractmethod
873 def convert(self, model_output: torch.Tensor, model_input: Union[torch.Tensor, Sequence[torch.Tensor]]) -> np.ndarray:
874 """
875 :param model_output: the output tensor generated by the model
876 :param model_input: the input tensor(s) for which the model produced the output (which may provide relevant meta-data)
877 :return: a numpy array of shape (N, D) where N=output.shape[0] is the number of data points and D is the number of
878 variables predicted by the model
879 """
880 pass