Coverage for src/sensai/torch/torch_base.py: 70%

454 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-29 18:29 +0000

1import functools 

2import io 

3import logging 

4import typing 

5from abc import ABC, abstractmethod 

6from typing import Union, Tuple, Callable, Optional, List, Sequence 

7 

8import numpy as np 

9import pandas as pd 

10import torch 

11from torch import nn 

12from torch.nn import functional as F 

13 

14from .torch_data import TensorScaler, VectorDataUtil, ClassificationVectorDataUtil, TorchDataSet, \ 

15 TorchDataSetProvider, Tensoriser, TorchDataSetFromDataFrames, RuleBasedTensoriser, \ 

16 TorchDataSetProviderFromVectorDataUtil 

17from .torch_enums import ClassificationOutputMode 

18from .torch_opt import NNOptimiser, NNLossEvaluatorRegression, NNLossEvaluatorClassification, NNOptimiserParams, TrainingInfo 

19from ..data import DataFrameSplitter 

20from ..normalisation import NormalisationMode 

21from ..util.dtype import to_float_array 

22from ..util.pickle import setstate 

23from ..util.string import ToStringMixin 

24from ..vector_model import VectorRegressionModel, VectorClassificationModel, TrainingContext 

25 

26log: logging.Logger = logging.getLogger(__name__) 

27 

28 

29class MCDropoutCapableNNModule(nn.Module, ABC): 

30 """ 

31 Base class for NN modules that are to support MC-Dropout. 

32 Support can be added by applying the _dropout function in the module's forward method. 

33 Then, to apply inference that samples results, call inferMCDropout rather than just using __call__. 

34 """ 

35 

36 def __init__(self) -> None: 

37 super().__init__() 

38 self._applyMCDropout = False 

39 self._pMCDropoutOverride = None 

40 

41 def __setstate__(self, d: dict) -> None: 

42 if "_applyMCDropout" not in d: 

43 d["_applyMCDropout"] = False 

44 if "_pMCDropoutOverride" not in d: 

45 d["_pMCDropoutOverride"] = None 

46 super().__setstate__(d) 

47 

48 def _dropout(self, x: torch.Tensor, p_training=None, p_inference=None) -> torch.Tensor: 

49 """ 

50 This method is to to applied within the module's forward method to apply dropouts during training and/or inference. 

51 

52 :param x: the model input tensor 

53 :param p_training: the probability with which to apply dropouts during training; if None, apply no dropout 

54 :param p_inference: the probability with which to apply dropouts during MC-Dropout-based inference (via inferMCDropout, 

55 which may override the probability via its optional argument); 

56 if None, a dropout is not to be applied 

57 :return: a potentially modified version of x with some elements dropped out, depending on application context and dropout 

58 probabilities 

59 """ 

60 if self.training and p_training is not None: 

61 return F.dropout(x, p_training) 

62 elif not self.training and self._applyMCDropout and p_inference is not None: 

63 return F.dropout(x, p_inference if self._pMCDropoutOverride is None else self._pMCDropoutOverride) 

64 else: 

65 return x 

66 

67 def _enable_mc_dropout(self, enabled=True, p_mc_dropout_override=None) -> None: 

68 self._applyMCDropout = enabled 

69 self._pMCDropoutOverride = p_mc_dropout_override 

70 

71 def infer_mc_dropout(self, x: Union[torch.Tensor, Sequence[torch.Tensor]], num_samples, p=None) -> Tuple[torch.Tensor, torch.Tensor]: 

72 """ 

73 Applies inference using MC-Dropout, drawing the given number of samples. 

74 

75 :param x: the model input (a tensor or tuple/list of tensors) 

76 :param num_samples: the number of samples to draw with MC-Dropout 

77 :param p: the dropout probability to apply, overriding the probability specified by the model's forward method; if None, use model's 

78 default 

79 :return: a pair (y, sd) where y the mean output tensor and sd is a tensor of the same dimension containing standard deviations 

80 """ 

81 if type(x) not in (tuple, list): 

82 x = [x] 

83 results = [] 

84 self._enable_mc_dropout(True, p_mc_dropout_override=p) 

85 try: 

86 for i in range(num_samples): 

87 y = self(*x) 

88 results.append(y) 

89 finally: 

90 self._enable_mc_dropout(False) 

91 results = torch.stack(results) 

92 mean = torch.mean(results, 0) 

93 stddev = torch.std(results, 0, unbiased=False) 

94 return mean, stddev 

95 

96 

97class TorchModel(ABC, ToStringMixin): 

98 """ 

99 sensAI abstraction for torch models, which supports one-line training, allows for convenient model application, 

100 has basic mechanisms for data scaling, and soundly handles persistence (via pickle). 

101 An instance wraps a torch.nn.Module, which is constructed on demand during training via the factory method 

102 createTorchModule. 

103 """ 

104 log: logging.Logger = log.getChild(__qualname__) 

105 

106 def __init__(self, cuda=True) -> None: 

107 self.cuda: bool = cuda 

108 self.module: Optional[torch.nn.Module] = None 

109 self.outputScaler: Optional[TensorScaler] = None 

110 self.inputScaler: Optional[TensorScaler] = None 

111 self.trainingInfo: Optional[TrainingInfo] = None 

112 self._gpu: Optional[int] = None 

113 self._normalisationCheckThreshold: Optional[int] = 5 

114 

115 def _tostring_exclude_private(self) -> bool: 

116 return True 

117 

118 def set_torch_module(self, module: torch.nn.Module) -> None: 

119 self.module = module 

120 

121 def set_normalisation_check_threshold(self, threshold: Optional[float]): 

122 self._normalisationCheckThreshold = threshold 

123 

124 def get_module_bytes(self) -> bytes: 

125 bytes_io = io.BytesIO() 

126 torch.save(self.module, bytes_io) 

127 return bytes_io.getvalue() 

128 

129 def set_module_bytes(self, model_bytes: bytes) -> None: 

130 model_file = io.BytesIO(model_bytes) 

131 self._load_model(model_file) 

132 

133 def get_torch_module(self) -> torch.nn.Module: 

134 return self.module 

135 

136 def _set_cuda_enabled(self, is_cuda_enabled: bool) -> None: 

137 self.cuda = is_cuda_enabled 

138 

139 def _is_cuda_enabled(self) -> bool: 

140 return self.cuda 

141 

142 def _load_model(self, model_file) -> None: # TODO: complete type hints: what types are allowed for modelFile? 

143 try: 

144 self.module = torch.load(model_file) 

145 self._gpu = self._get_gpu_from_model_parameter_device() 

146 except: 

147 if self._is_cuda_enabled(): 

148 if torch.cuda.device_count() > 0: 

149 new_device = "cuda:0" 

150 else: 

151 new_device = "cpu" 

152 self.log.warning(f"Loading of CUDA model failed, trying to map model to device {new_device}...") 

153 if type(model_file) != str: 

154 model_file.seek(0) 

155 try: 

156 self.module = torch.load(model_file, map_location=new_device) 

157 except: 

158 self.log.warning(f"Failure to map model to device {new_device}, trying CPU...") 

159 if new_device != "cpu": 

160 new_device = "cpu" 

161 self.module = torch.load(model_file, map_location=new_device) 

162 if new_device == "cpu": 

163 self._set_cuda_enabled(False) 

164 self._gpu = None 

165 else: 

166 self._gpu = 0 

167 self.log.info(f"Model successfully loaded to {new_device}") 

168 else: 

169 raise 

170 

171 @abstractmethod 

172 def create_torch_module(self) -> torch.nn.Module: 

173 pass 

174 

175 def __getstate__(self) -> dict: 

176 state = dict(self.__dict__) 

177 del state["module"] 

178 state["modelBytes"] = self.get_module_bytes() 

179 return state 

180 

181 def __setstate__(self, d: dict) -> None: 

182 # backward compatibility 

183 if "bestEpoch" in d: 

184 d["trainingInfo"] = TrainingInfo(best_epoch=d["bestEpoch"]) 

185 del d["bestEpoch"] 

186 new_default_properties = {"_normalisationCheckThreshold": 5} 

187 

188 model_bytes = None 

189 if "modelBytes" in d: 

190 model_bytes = d["modelBytes"] 

191 del d["modelBytes"] 

192 setstate(TorchModel, self, d, new_default_properties=new_default_properties) 

193 if model_bytes is not None: 

194 self.set_module_bytes(model_bytes) 

195 

196 def apply(self, 

197 x: Union[torch.Tensor, np.ndarray, TorchDataSet, Sequence[torch.Tensor]], 

198 as_numpy: bool = True, create_batch: bool = False, 

199 mc_dropout_samples: Optional[int] = None, 

200 mc_dropout_probability: Optional[float] = None, 

201 scale_output: bool = False, 

202 scale_input: bool = False) -> Union[torch.Tensor, np.ndarray, Tuple]: 

203 """ 

204 Applies the model to the given input tensor and returns the result 

205 

206 :param x: the input tensor (either a batch or, if createBatch=True, a single data point), a data set or a tuple/list of tensors 

207 (if the model accepts more than one input). 

208 If it is a data set, it will be processed at once, so the data set must not be too large to be processed at once. 

209 :param as_numpy: flag indicating whether to convert the result to a numpy.array (if False, return tensor) 

210 :param create_batch: whether to add an additional tensor dimension for a batch containing just one data point 

211 :param mc_dropout_samples: if not None, apply MC-Dropout-based inference with the respective number of samples; if None, apply 

212 regular inference 

213 :param mc_dropout_probability: the probability with which to apply dropouts in MC-Dropout-based inference; if None, use model's 

214 default 

215 :param scale_output: whether to scale the output that is produced by the underlying model (using this instance's output scaler, 

216 if any) 

217 :param scale_input: whether to scale the input (using this instance's input scaler, if any) before applying the underlying model 

218 

219 :return: an output tensor or, if MC-Dropout is applied, a pair (y, sd) where y the mean output tensor and sd is a tensor of the 

220 same dimension containing standard deviations 

221 """ 

222 def extract(z): 

223 if scale_output: 

224 z = self.scaled_output(z) 

225 if self._is_cuda_enabled(): 

226 z = z.cpu() 

227 z = z.detach() 

228 if as_numpy: 

229 z = z.numpy() 

230 return z 

231 

232 model = self.get_torch_module() 

233 model.eval() 

234 

235 if isinstance(x, TorchDataSet): 

236 x = next(x.iter_batches(x.size(), input_only=True, shuffle=False)) 

237 elif isinstance(x, np.ndarray): 

238 x = to_float_array(x) 

239 x = torch.from_numpy(x).float() 

240 

241 if type(x) not in (list, tuple): 

242 inputs = [x] 

243 else: 

244 inputs = x 

245 

246 if self._is_cuda_enabled(): 

247 torch.cuda.set_device(self._gpu) 

248 inputs = [t.cuda() for t in inputs] 

249 if scale_input: 

250 inputs = [self.inputScaler.normalise(t) for t in inputs] 

251 if create_batch: 

252 inputs = [t.view(1, *x.size()) for t in inputs] 

253 

254 # check input normalisation 

255 if self._normalisationCheckThreshold is not None: 

256 for i, t in enumerate(inputs): 

257 if t.is_floating_point() and t.numel() > 0: # skip any integer tensors (which typically contain lengths) and empty tensors 

258 max_value = t.abs().max().item() 

259 if max_value > self._normalisationCheckThreshold: 

260 log.warning(f"Received value in input tensor {i} which is likely to not be correctly normalised: " 

261 f"maximum abs. value in tensor is %f" % max_value) 

262 if mc_dropout_samples is None: 

263 y = model(*inputs) 

264 return extract(y) 

265 else: 

266 y, stddev = model.inferMCDropout(x, mc_dropout_samples, p=mc_dropout_probability) 

267 return extract(y), extract(stddev) 

268 

269 def apply_scaled(self, x: Union[torch.Tensor, np.ndarray, TorchDataSet, Sequence[torch.Tensor]], 

270 as_numpy: bool = True, 

271 create_batch: bool = False, 

272 mc_dropout_samples: Optional[int] = None, 

273 mc_dropout_probability: Optional[float] = None) \ 

274 -> Union[torch.Tensor, np.ndarray]: 

275 """ 

276 applies the model to the given input tensor and returns the scaled result (i.e. in the original scale) 

277 

278 :param x: the input tensor(s) or data set 

279 :param as_numpy: flag indicating whether to convert the result to a numpy.array (if False, return tensor) 

280 :param create_batch: whether to add an additional tensor dimension for a batch containing just one data point 

281 :param mc_dropout_samples: if not None, apply MC-Dropout-based inference with the respective number of samples; if None, apply 

282 regular inference 

283 :param mc_dropout_probability: the probability with which to apply dropouts in MC-Dropout-based inference; if None, use model's 

284 default 

285 

286 :return: a scaled output tensor or, if MC-Dropout is applied, a pair (y, sd) of scaled tensors, where 

287 y the mean output tensor and sd is a tensor of the same dimension containing standard deviations 

288 """ 

289 return self.apply(x, scale_output=True, scale_input=True, as_numpy=as_numpy, create_batch=create_batch, 

290 mc_dropout_samples=mc_dropout_samples, mc_dropout_probability=mc_dropout_probability) 

291 

292 def scaled_output(self, output: torch.Tensor) -> torch.Tensor: 

293 return self.outputScaler.denormalise(output) 

294 

295 def _extract_params_from_data(self, data: TorchDataSetProvider) -> None: 

296 self.outputScaler = data.get_output_tensor_scaler() 

297 self.inputScaler = data.get_input_tensor_scaler() 

298 

299 def fit(self, data: TorchDataSetProvider, nn_optimiser_params: NNOptimiserParams, strategy: "TorchModelFittingStrategy" = None) \ 

300 -> None: 

301 """ 

302 Fits this model using the given model and strategy 

303 

304 :param data: a provider for the data with which to fit the model 

305 :param strategy: the fitting strategy; if None, use TorchModelFittingStrategyDefault. 

306 Pass your own strategy to perform custom fitting processes, e.g. process which involve multi-stage learning 

307 :param nn_optimiser_params: the parameters with which to create an optimiser which can be applied in the fitting strategy 

308 """ 

309 self._extract_params_from_data(data) 

310 optimiser = NNOptimiser(nn_optimiser_params) 

311 if strategy is None: 

312 strategy = TorchModelFittingStrategyDefault() 

313 self.trainingInfo = strategy.fit(self, data, optimiser) 

314 self._gpu = self._get_gpu_from_model_parameter_device() 

315 

316 def _get_gpu_from_model_parameter_device(self) -> Optional[int]: 

317 try: 

318 return next(self.module.parameters()).get_device() 

319 except: 

320 return None 

321 

322 @property 

323 def best_epoch(self) -> Optional[int]: 

324 return self.trainingInfo.best_epoch if self.trainingInfo is not None else None 

325 

326 @property 

327 def total_epochs(self) -> Optional[int]: 

328 return self.trainingInfo.total_epochs if self.trainingInfo is not None else None 

329 

330 def _tostring_excludes(self) -> List[str]: 

331 return ['_gpu', 'module', 'trainingInfo', "inputScaler", "outputScaler"] 

332 

333 def _tostring_additional_entries(self): 

334 return dict(bestEpoch=self.best_epoch, totalEpochs=self.total_epochs) 

335 

336 

337class TorchModelFittingStrategy(ABC): 

338 """ 

339 Defines the interface for fitting strategies that can be used in TorchModel.fit 

340 """ 

341 @abstractmethod 

342 def fit(self, model: TorchModel, data: TorchDataSetProvider, nn_optimiser: NNOptimiser) -> Optional[TrainingInfo]: 

343 pass 

344 

345 

346class TorchModelFittingStrategyDefault(TorchModelFittingStrategy): 

347 """ 

348 Represents the default fitting strategy, which simply applies the given optimiser to the model and data 

349 """ 

350 def fit(self, model: TorchModel, data: TorchDataSetProvider, nn_optimiser: NNOptimiser) -> Optional[TrainingInfo]: 

351 return nn_optimiser.fit(model, data) 

352 

353 

354class TorchModelFromModuleFactory(TorchModel): 

355 def __init__(self, module_factory: Callable[..., torch.nn.Module], *args, cuda: bool = True, **kwargs) -> None: 

356 super().__init__(cuda) 

357 self.args = args 

358 self.kwargs = kwargs 

359 self.moduleFactory = module_factory 

360 

361 def create_torch_module(self) -> torch.nn.Module: 

362 return self.moduleFactory(*self.args, **self.kwargs) 

363 

364 

365class TorchModelFromModule(TorchModel): 

366 def __init__(self, module: torch.nn.Module, cuda: bool = True): 

367 super().__init__(cuda=cuda) 

368 self.module = module 

369 

370 def create_torch_module(self) -> torch.nn.Module: 

371 return self.module 

372 

373 

374class TorchModelFactoryFromModule: 

375 """Represents a factory for the creation of a TorchModel based on a torch module""" 

376 def __init__(self, module: torch.nn.Module, cuda: bool = True): 

377 self.module = module 

378 self.cuda = cuda 

379 

380 def __call__(self) -> TorchModel: 

381 return TorchModelFromModule(self.module, self.cuda) 

382 

383 

384class VectorTorchModel(TorchModel, ABC): 

385 """ 

386 Base class for TorchModels that can be used within VectorModels, where the input and output dimensions 

387 are determined by the data 

388 """ 

389 def __init__(self, cuda: bool = True) -> None: 

390 super().__init__(cuda=cuda) 

391 self.inputDim = None 

392 self.outputDim = None 

393 

394 def _extract_params_from_data(self, data: TorchDataSetProvider) -> None: 

395 super()._extract_params_from_data(data) 

396 self.inputDim = data.get_input_dim() 

397 self.outputDim = data.get_model_output_dim() 

398 

399 def create_torch_module(self) -> torch.nn.Module: 

400 return self.create_torch_module_for_dims(self.inputDim, self.outputDim) 

401 

402 @abstractmethod 

403 def create_torch_module_for_dims(self, input_dim: int, output_dim: int) -> torch.nn.Module: 

404 """ 

405 :param input_dim: the number of input dimensions as reported by the data set provider (number of columns 

406 in input data frame for default providers) 

407 :param output_dim: the number of output dimensions as reported by the data set provider (for default providers, 

408 this will be the number of columns in the output data frame or, for classification, the number of classes) 

409 :return: the torch module 

410 """ 

411 pass 

412 

413 

414class TorchAutoregressiveResultHandler(ABC): 

415 """ 

416 Supports the saving of predictions results such that subsequent predictions 

417 can build on earlier predictions, thus supporting autoregressive models. 

418 """ 

419 

420 @abstractmethod 

421 def clear_results(self): 

422 pass 

423 

424 @abstractmethod 

425 def save_results(self, input_df: pd.DataFrame, results: np.ndarray) -> None: 

426 """ 

427 Saves the regression results such that they can be used as input for subsequent prediction steps. 

428 The input will typically be processed by a feature generator or vectoriser, so the result 

429 should be stored in a place from which the respective feature generator or vectoriser can retrieve it. 

430 

431 :param input_df: the input data frame for which results were obtained (number of rows corresponds to 

432 length of `results`) 

433 :param results: the results array, which is typically a 2D array where `results[i]` is an array 

434 containing the results for the i-th input row 

435 """ 

436 pass 

437 

438 

439TTorchVectorRegressionModel = typing.TypeVar("TTorchVectorRegressionModel", bound="TorchVectorRegressionModel") 

440 

441 

442class TorchVectorRegressionModel(VectorRegressionModel): 

443 """ 

444 Base class for the implementation of VectorRegressionModels based on TorchModels. 

445 An instance of this class will have an instance of TorchModel as the underlying model. 

446 """ 

447 

448 def __init__(self, torch_model_factory: Callable[[], TorchModel], 

449 normalisation_mode: NormalisationMode = NormalisationMode.NONE, 

450 nn_optimiser_params: Union[dict, NNOptimiserParams, None] = None) -> None: 

451 """ 

452 :param torch_model_factory: the factory function with which to create the contained TorchModel instance that the instance is to 

453 encapsulate. For the instance to be picklable, this cannot be a lambda or locally defined function. 

454 :param normalisation_mode: the normalisation mode to apply to input data frames 

455 :param nn_optimiser_params: the parameters to apply in NNOptimiser during training 

456 """ 

457 super().__init__() 

458 

459 if nn_optimiser_params is None: 

460 nn_optimiser_params_instance = NNOptimiserParams() 

461 else: 

462 nn_optimiser_params_instance = NNOptimiserParams.from_dict_or_instance(nn_optimiser_params) 

463 if nn_optimiser_params_instance.loss_evaluator is None: 

464 nn_optimiser_params_instance.loss_evaluator = NNLossEvaluatorRegression(NNLossEvaluatorRegression.LossFunction.MSELOSS) 

465 

466 self.torch_model_factory = torch_model_factory 

467 self.normalisationMode = normalisation_mode 

468 self.nnOptimiserParams = nn_optimiser_params_instance 

469 self.model: Optional[TorchModel] = None 

470 self.inputTensoriser: Optional[Tensoriser] = None 

471 self.outputTensoriser: Optional[Tensoriser] = None 

472 self.outputTensorToArrayConverter: Optional[OutputTensorToArrayConverter] = None 

473 self.torchDataSetProviderFactory: Optional[TorchDataSetProviderFactory] = None 

474 self.dataFrameSplitter: Optional[DataFrameSplitter] = None 

475 self._normalisationCheckThreshold = 5 

476 self.inferenceBatchSize: Optional[int] = None 

477 self.autoregressiveResultHandler: Optional[TorchAutoregressiveResultHandler] = None 

478 

479 def __setstate__(self, state) -> None: 

480 if "modelClass" in state: # old-style factory 

481 state["torch_model_factory"] = functools.partial(state["modelClass"], *state["modelArgs"], **state["modelKwArgs"]) 

482 for k in ("modelClass", "modelArgs", "modelKwArgs"): 

483 del state[k] 

484 state["nnOptimiserParams"] = NNOptimiserParams.from_dict_or_instance(state["nnOptimiserParams"]) 

485 new_optional_members = ["inputTensoriser", "torchDataSetProviderFactory", "dataFrameSplitter", "outputTensoriser", 

486 "outputTensorToArrayConverter", "inferenceBatchSize", "autoRegressiveResultHandler"] 

487 new_default_properties = {"_normalisationCheckThreshold": 5} 

488 setstate(TorchVectorRegressionModel, self, state, new_optional_properties=new_optional_members, 

489 new_default_properties=new_default_properties) 

490 

491 @classmethod 

492 def from_module(cls, module: torch.nn.Module, cuda=True, normalisation_mode: NormalisationMode = NormalisationMode.NONE, 

493 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> "TorchVectorRegressionModel": 

494 return cls(TorchModelFactoryFromModule(module=module, cuda=cuda), normalisation_mode=normalisation_mode, 

495 nn_optimiser_params=nn_optimiser_params) 

496 

497 def _tostring_excludes(self) -> List[str]: 

498 excludes = super()._tostring_excludes() 

499 if self.model is not None: 

500 return excludes + ["modelClass", "modelArgs", "modelKwArgs"] 

501 else: 

502 return excludes 

503 

504 def with_input_tensoriser(self: TTorchVectorRegressionModel, tensoriser: Tensoriser) -> TTorchVectorRegressionModel: 

505 """ 

506 :param tensoriser: tensoriser to use in order to convert input data frames to (one or more) tensors. 

507 The default tensoriser directly converts the data frame's values (which is assumed to contain only scalars that 

508 can be coerced to floats) to a float tensor. 

509 The use of a custom tensoriser is necessary if a non-trivial conversion is necessary or if the data frame 

510 is to be converted to more than one input tensor. 

511 :return: self 

512 """ 

513 self.inputTensoriser = tensoriser 

514 return self 

515 

516 def with_output_tensoriser(self: TTorchVectorRegressionModel, tensoriser: RuleBasedTensoriser) -> TTorchVectorRegressionModel: 

517 """ 

518 :param tensoriser: tensoriser to use in order to convert the output data frame to a tensor. 

519 The default output tensoriser directly converts the data frame's values to a float tensor. 

520 

521 NOTE: It is required to be a rule-based tensoriser, because mechanisms that require fitting on the data 

522 and thus perform a data-dependendent conversion are likely to cause problems because they would need 

523 to be reversed at inference time (since the model will be trained on the converted values). If you require 

524 a transformation, use a target transformer, which will be applied before the tensoriser. 

525 :return: self 

526 """ 

527 self.outputTensoriser = tensoriser 

528 return self 

529 

530 def with_output_tensor_to_array_converter(self: TTorchVectorRegressionModel, 

531 output_tensor_to_array_converter: "OutputTensorToArrayConverter") -> TTorchVectorRegressionModel: 

532 """ 

533 Configures the use of a custom converter from tensors to numpy arrays, which is applied during inference. 

534 A custom converter can be required, for example, to handle variable-length outputs (where the output tensor 

535 will typically contain unwanted padding). Note that since the converter is for inference only, it may be 

536 required to use a custom loss evaluator during training if the use of a custom converter is necessary. 

537 

538 :param output_tensor_to_array_converter: the converter 

539 :return: self 

540 """ 

541 self.outputTensorToArrayConverter = output_tensor_to_array_converter 

542 return self 

543 

544 def with_torch_data_set_provider_factory(self: TTorchVectorRegressionModel, 

545 torch_data_set_provider_factory: "TorchDataSetProviderFactory") -> TTorchVectorRegressionModel: 

546 """ 

547 :param torch_data_set_provider_factory: the torch data set provider factory, which is used to instantiate the provider which 

548 will provide the training and validation data sets from the input data frame that is passed in for learning. 

549 By default, TorchDataSetProviderFactoryRegressionDefault is used. 

550 :return: self 

551 """ 

552 self.torchDataSetProviderFactory = torch_data_set_provider_factory 

553 return self 

554 

555 def with_data_frame_splitter(self: TTorchVectorRegressionModel, data_frame_splitter: DataFrameSplitter) -> TTorchVectorRegressionModel: 

556 """ 

557 :param data_frame_splitter: the data frame splitter which is used to split the input/output data frames that are passed for 

558 learning into a data frame that is used for training and a data frame that is used for validation. 

559 The input data frame is the data frame that is passed as input to the splitter, and the returned indices 

560 are used to split both the input and output data frames in the same way. 

561 :return: self 

562 """ 

563 self.dataFrameSplitter = data_frame_splitter 

564 return self 

565 

566 def with_normalisation_check_threshold(self: TTorchVectorRegressionModel, threshold: Optional[float]) -> TTorchVectorRegressionModel: 

567 """ 

568 Defines a threshold with which to check inputs that are passed to the underlying neural network. 

569 Whenever an (absolute) input value exceeds the threshold, a warning is triggered. 

570 

571 :param threshold: the threshold 

572 :return: self 

573 """ 

574 self._normalisationCheckThreshold = threshold 

575 return self 

576 

577 def with_autoregressive_result_handler(self: TTorchVectorRegressionModel, 

578 result_handler: TorchAutoregressiveResultHandler, 

579 inference_batch_size=1) -> TTorchVectorRegressionModel: 

580 """ 

581 Adds a result handler which can be used to store prediction results such that subsequent predictions 

582 can use the prediction result, supporting autoregressive models. 

583 The autoregressive predictions are assumed to be handled in a single call to method :meth:`predict`, 

584 and the results will be stored for the duration of the call. 

585 For autoregressive predictions that build on earlier predictions, we must typically restrict 

586 the batch size such that predictions from the earlier batch can be saved and correctly reused 

587 as input for the subsequent predictions. The models input preprocessors (such as feature generators 

588 or vectorisers) must make ensure that the results being stored by the result handler are appropriately 

589 used as input. 

590 

591 :param result_handler: the result handler 

592 :param inference_batch_size: the batch size to use for predictions 

593 :return: self 

594 """ 

595 self.autoregressiveResultHandler = result_handler 

596 self.inferenceBatchSize = inference_batch_size 

597 return self 

598 

599 def _create_torch_model(self) -> TorchModel: 

600 torch_model = self.torch_model_factory() 

601 torch_model.set_normalisation_check_threshold(self._normalisationCheckThreshold) 

602 return torch_model 

603 

604 def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> TorchDataSetProvider: 

605 factory = self.torchDataSetProviderFactory 

606 if factory is None: 

607 factory = TorchDataSetProviderFactoryRegressionDefault() 

608 return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser, 

609 output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter) 

610 

611 def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> None: 

612 self._warn_sample_weights_unsupported(False, weights) 

613 if self.inputTensoriser is not None: 

614 log.info(f"Fitting {self.inputTensoriser} ...") 

615 self.inputTensoriser.fit(inputs, model=self) 

616 self.model = self._create_torch_model() 

617 data_set_provider = self._create_data_set_provider(inputs, outputs) 

618 self.model.fit(data_set_provider, self.nnOptimiserParams) 

619 

620 def _predict_outputs_for_input_data_frame(self, inputs: pd.DataFrame) -> np.ndarray: 

621 tensorise_dynamically = False 

622 if self.autoregressiveResultHandler is not None: 

623 self.autoregressiveResultHandler.clear_results() 

624 tensorise_dynamically = True # must be dynamically tensorised to allow inclusion of predicted results 

625 batch_size = self.nnOptimiserParams.batch_size if self.inferenceBatchSize is None else self.inferenceBatchSize 

626 results: List[np.ndarray] = [] 

627 data_set = TorchDataSetFromDataFrames(inputs, None, self.model.cuda, input_tensoriser=self.inputTensoriser, 

628 tensorise_dynamically=tensorise_dynamically) 

629 start_idx = 0 

630 for input_batch in data_set.iter_batches(batch_size, input_only=True): 

631 if self.outputTensorToArrayConverter is None: 

632 result = self.model.apply_scaled(input_batch, as_numpy=True) 

633 else: 

634 output_batch = self.model.apply_scaled(input_batch, as_numpy=False) 

635 result = self.outputTensorToArrayConverter.convert(output_batch, input_batch) 

636 if self.autoregressiveResultHandler is not None: 

637 self.autoregressiveResultHandler.save_results(inputs.iloc[start_idx:start_idx+len(result)], result) 

638 start_idx += len(result) 

639 results.append(result) 

640 if self.autoregressiveResultHandler is not None: 

641 self.autoregressiveResultHandler.clear_results() 

642 return np.concatenate(results) 

643 

644 def _predict(self, inputs: pd.DataFrame) -> pd.DataFrame: 

645 y_array = self._predict_outputs_for_input_data_frame(inputs) 

646 return pd.DataFrame(y_array, columns=self.get_model_output_variable_names()) 

647 

648 

649TTorchVectorClassificationModel = typing.TypeVar("TTorchVectorClassificationModel", bound="TorchVectorClassificationModel") 

650 

651 

652class TorchVectorClassificationModel(VectorClassificationModel): 

653 """ 

654 Base class for the implementation of VectorClassificationModels based on TorchModels. 

655 An instance of this class will have an instance of TorchModel as the underlying model. 

656 """ 

657 def __init__(self, output_mode: ClassificationOutputMode, 

658 torch_model_factory: Callable[[], TorchModel], 

659 normalisation_mode: NormalisationMode = NormalisationMode.NONE, 

660 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> None: 

661 """ 

662 :param output_mode: specifies the nature of the output of the underlying neural network model 

663 :param torch_model_factory: the factory function with which to create the contained TorchModel instance that the instance is to 

664 encapsulate. For the instance to be picklable, this cannot be a lambda or locally defined function. 

665 :param normalisation_mode: the normalisation mode to apply to input data frames 

666 :param nn_optimiser_params: the parameters to apply in NNOptimiser during training 

667 """ 

668 super().__init__() 

669 

670 if nn_optimiser_params is None: 

671 nn_optimiser_params = NNOptimiserParams() 

672 if nn_optimiser_params.loss_evaluator is None: 

673 loss_function = NNLossEvaluatorClassification.LossFunction.default_for_output_mode(output_mode) 

674 nn_optimiser_params.loss_evaluator = NNLossEvaluatorClassification(loss_function) 

675 

676 self.outputMode = output_mode 

677 self.torch_model_factory = torch_model_factory 

678 self.normalisationMode = normalisation_mode 

679 self.nnOptimiserParams: NNOptimiserParams = nn_optimiser_params 

680 self.model: Optional[TorchModel] = None 

681 self.inputTensoriser: Optional[Tensoriser] = None 

682 self.outputTensoriser: Optional[Tensoriser] = None 

683 self.torchDataSetProviderFactory: Optional[TorchDataSetProviderFactory] = None 

684 self.dataFrameSplitter: Optional[DataFrameSplitter] = None 

685 self._normalisationCheckThreshold = 5 

686 

687 # noinspection DuplicatedCode 

688 def __setstate__(self, state) -> None: 

689 if "modelClass" in state: # old-style factory 

690 state["torch_model_factory"] = functools.partial(state["modelClass"], *state["modelArgs"], **state["modelKwArgs"]) 

691 for k in ("modelClass", "modelArgs", "modelKwArgs"): 

692 del state[k] 

693 state["nnOptimiserParams"] = NNOptimiserParams.from_dict_or_instance(state["nnOptimiserParams"]) 

694 new_optional_members = ["inputTensoriser", "torchDataSetProviderFactory", "dataFrameSplitter", "outputTensoriser"] 

695 new_default_properties = {"outputMode": ClassificationOutputMode.PROBABILITIES, "_normalisationCheckThreshold": 5} 

696 setstate(TorchVectorClassificationModel, self, state, new_optional_properties=new_optional_members, 

697 new_default_properties=new_default_properties) 

698 

699 @classmethod 

700 def from_module(cls, module: torch.nn.Module, output_mode: ClassificationOutputMode, cuda=True, 

701 normalisation_mode: NormalisationMode = NormalisationMode.NONE, 

702 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> "TorchVectorClassificationModel": 

703 return cls(output_mode, TorchModelFactoryFromModule(module, cuda=cuda), 

704 normalisation_mode=normalisation_mode, nn_optimiser_params=nn_optimiser_params) 

705 

706 def with_input_tensoriser(self: TTorchVectorClassificationModel, tensoriser: Tensoriser) -> TTorchVectorClassificationModel: 

707 """ 

708 :param tensoriser: tensoriser to use in order to convert input data frames to (one or more) tensors. 

709 The default tensoriser directly converts the data frame's values (which is assumed to contain only scalars that 

710 can be coerced to floats) to a float tensor. 

711 The use of a custom tensoriser is necessary if a non-trivial conversion is necessary or if the data frame 

712 is to be converted to more than one input tensor. 

713 :return: self 

714 """ 

715 self.inputTensoriser = tensoriser 

716 return self 

717 

718 def with_output_tensoriser(self: TTorchVectorClassificationModel, tensoriser: RuleBasedTensoriser) -> TTorchVectorClassificationModel: 

719 """ 

720 :param tensoriser: tensoriser to use in order to convert the output data frame to a tensor. 

721 NOTE: It is required to be a rule-based tensoriser, because mechanisms that require fitting on the data 

722 and thus perform a data-dependendent conversion are likely to cause problems because they would need 

723 to be reversed at inference time (since the model will be trained on the converted values). If you require 

724 a transformation, use a target transformer, which will be applied before the tensoriser. 

725 """ 

726 self.outputTensoriser = tensoriser 

727 return self 

728 

729 def with_torch_data_set_provider_factory(self: TTorchVectorClassificationModel, 

730 torch_data_set_provider_factory: "TorchDataSetProviderFactory") -> TTorchVectorClassificationModel: 

731 """ 

732 :param torch_data_set_provider_factory: the torch data set provider factory, which is used to instantiate the provider which 

733 will provide the training and validation data sets from the input data frame that is passed in for learning. 

734 By default, TorchDataSetProviderFactoryClassificationDefault is used. 

735 :return: self 

736 """ 

737 self.torchDataSetProviderFactory = torch_data_set_provider_factory 

738 return self 

739 

740 def with_data_frame_splitter(self: TTorchVectorClassificationModel, data_frame_splitter: DataFrameSplitter) \ 

741 -> TTorchVectorClassificationModel: 

742 """ 

743 :param data_frame_splitter: the data frame splitter which is used to split the input/output data frames that are passed for 

744 learning into a data frame that is used for training and a data frame that is used for validation. 

745 The input data frame is the data frame that is passed as input to the splitter, and the returned indices 

746 are used to split both the input and output data frames in the same way. 

747 :return: self 

748 """ 

749 self.dataFrameSplitter = data_frame_splitter 

750 return self 

751 

752 def with_normalisation_check_threshold(self: TTorchVectorClassificationModel, threshold: Optional[float]) \ 

753 -> TTorchVectorClassificationModel: 

754 """ 

755 Defines a threshold with which to check inputs that are passed to the underlying neural network. 

756 Whenever an (absolute) input value exceeds the threshold, a warning is triggered. 

757 

758 :param threshold: the threshold 

759 :return: self 

760 """ 

761 self._normalisationCheckThreshold = threshold 

762 return self 

763 

764 def _create_torch_model(self) -> TorchModel: 

765 torch_model = self.torch_model_factory() 

766 torch_model.set_normalisation_check_threshold(self._normalisationCheckThreshold) 

767 return torch_model 

768 

769 def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> TorchDataSetProvider: 

770 factory = self.torchDataSetProviderFactory 

771 if factory is None: 

772 factory = TorchDataSetProviderFactoryClassificationDefault() 

773 return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser, 

774 output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter) 

775 

776 def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame, weights: Optional[pd.Series] = None) -> None: 

777 self._warn_sample_weights_unsupported(False, weights) 

778 if len(outputs.columns) != 1: 

779 raise ValueError("Expected one output dimension: the class labels") 

780 

781 if self.inputTensoriser is not None: 

782 log.info(f"Fitting {self.inputTensoriser} ...") 

783 self.inputTensoriser.fit(inputs, model=self) 

784 

785 # transform outputs: for each data point, the new output shall be the index in the list of labels 

786 labels: pd.Series = outputs.iloc[:, 0] 

787 outputs = pd.DataFrame([self._labels.index(l) for l in labels], columns=outputs.columns, index=outputs.index) 

788 

789 self.model = self._create_torch_model() 

790 

791 data_set_provider = self._create_data_set_provider(inputs, outputs) 

792 self.model.fit(data_set_provider, self.nnOptimiserParams) 

793 

794 def _predict_outputs_for_input_data_frame(self, inputs: pd.DataFrame) -> torch.Tensor: 

795 batch_size = self.nnOptimiserParams.batch_size 

796 results = [] 

797 data_set = TorchDataSetFromDataFrames(inputs, None, self.model.cuda, input_tensoriser=self.inputTensoriser) 

798 for inputBatch in data_set.iter_batches(batch_size, input_only=True): 

799 results.append(self.model.apply_scaled(inputBatch, as_numpy=False)) 

800 return torch.cat(results, dim=0) 

801 

802 def _predict_class_probabilities(self, inputs: pd.DataFrame) -> pd.DataFrame: 

803 y = self._predict_outputs_for_input_data_frame(inputs) 

804 if self.outputMode == ClassificationOutputMode.PROBABILITIES: 

805 pass 

806 elif self.outputMode == ClassificationOutputMode.LOG_PROBABILITIES: 

807 y = y.exp() 

808 elif self.outputMode == ClassificationOutputMode.UNNORMALISED_LOG_PROBABILITIES: 

809 y = y.softmax(dim=1) 

810 else: 

811 raise ValueError(f"Unhandled output mode {self.outputMode}") 

812 return pd.DataFrame(y.numpy(), columns=self._labels) 

813 

814 def _tostring_excludes(self) -> List[str]: 

815 excludes = super()._tostring_excludes() 

816 if self.model is not None: 

817 return excludes + ["modelClass", "modelArgs", "modelKwArgs"] 

818 else: 

819 return excludes 

820 

821 

822class TorchDataSetProviderFactory(ABC): 

823 @abstractmethod 

824 def create_data_set_provider(self, 

825 inputs: pd.DataFrame, 

826 outputs: pd.DataFrame, 

827 model: Union[TorchVectorRegressionModel, TorchVectorClassificationModel], 

828 training_context: TrainingContext, 

829 input_tensoriser: Optional[Tensoriser], 

830 output_tensoriser: Optional[Tensoriser], 

831 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider: 

832 pass 

833 

834 

835class TorchDataSetProviderFactoryClassificationDefault(TorchDataSetProviderFactory): 

836 def __init__(self, tensorise_dynamically=False): 

837 """ 

838 :param tensorise_dynamically: whether tensorisation shall take place on the fly whenever the provided data sets are iterated; 

839 if False, tensorisation takes place once in a precomputation stage (tensors must jointly fit into memory) 

840 """ 

841 self.tensoriseDynamically = tensorise_dynamically 

842 

843 def create_data_set_provider(self, 

844 inputs: pd.DataFrame, 

845 outputs: pd.DataFrame, 

846 model: TorchVectorClassificationModel, 

847 training_context: TrainingContext, 

848 input_tensoriser: Optional[Tensoriser], 

849 output_tensoriser: Optional[Tensoriser], 

850 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider: 

851 data_util = ClassificationVectorDataUtil(inputs, outputs, model.model.cuda, len(model._labels), # TODO FIXME 

852 normalisation_mode=model.normalisationMode, input_tensoriser=input_tensoriser, output_tensoriser=output_tensoriser, 

853 data_frame_splitter=data_frame_splitter) 

854 return TorchDataSetProviderFromVectorDataUtil(data_util, model.model.cuda, tensorise_dynamically=self.tensoriseDynamically) 

855 

856 

857class TorchDataSetProviderFactoryRegressionDefault(TorchDataSetProviderFactory): 

858 def __init__(self, tensorise_dynamically=False): 

859 """ 

860 :param tensorise_dynamically: whether tensorisation shall take place on the fly whenever the provided data sets are iterated; 

861 if False, tensorisation takes place once in a precomputation stage (tensors must jointly fit into memory) 

862 """ 

863 self.tensoriseDynamically = tensorise_dynamically 

864 

865 def create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame, model: TorchVectorRegressionModel, 

866 training_context: TrainingContext, input_tensoriser: Optional[Tensoriser], output_tensoriser: Optional[Tensoriser], 

867 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider: 

868 data_util = VectorDataUtil(inputs, outputs, model.model.cuda, normalisation_mode=model.normalisationMode, 

869 input_tensoriser=input_tensoriser, output_tensoriser=output_tensoriser, data_frame_splitter=data_frame_splitter) 

870 return TorchDataSetProviderFromVectorDataUtil(data_util, model.model.cuda, tensorise_dynamically=self.tensoriseDynamically) 

871 

872 

873class OutputTensorToArrayConverter(ABC): 

874 @abstractmethod 

875 def convert(self, model_output: torch.Tensor, model_input: Union[torch.Tensor, Sequence[torch.Tensor]]) -> np.ndarray: 

876 """ 

877 :param model_output: the output tensor generated by the model 

878 :param model_input: the input tensor(s) for which the model produced the output (which may provide relevant meta-data) 

879 :return: a numpy array of shape (N, D) where N=output.shape[0] is the number of data points and D is the number of 

880 variables predicted by the model  

881 """ 

882 pass