Coverage for src/sensai/torch/torch_base.py: 70%

452 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import functools 

2import io 

3import logging 

4import typing 

5from abc import ABC, abstractmethod 

6from typing import Union, Tuple, Callable, Optional, List, Sequence 

7 

8import numpy as np 

9import pandas as pd 

10import torch 

11from torch import nn 

12from torch.nn import functional as F 

13 

14from .torch_data import TensorScaler, VectorDataUtil, ClassificationVectorDataUtil, TorchDataSet, \ 

15 TorchDataSetProvider, Tensoriser, TorchDataSetFromDataFrames, RuleBasedTensoriser, \ 

16 TorchDataSetProviderFromVectorDataUtil 

17from .torch_enums import ClassificationOutputMode 

18from .torch_opt import NNOptimiser, NNLossEvaluatorRegression, NNLossEvaluatorClassification, NNOptimiserParams, TrainingInfo 

19from ..data import DataFrameSplitter 

20from ..normalisation import NormalisationMode 

21from ..util.dtype import to_float_array 

22from ..util.pickle import setstate 

23from ..util.string import ToStringMixin 

24from ..vector_model import VectorRegressionModel, VectorClassificationModel, TrainingContext 

25 

26log: logging.Logger = logging.getLogger(__name__) 

27 

28 

29class MCDropoutCapableNNModule(nn.Module, ABC): 

30 """ 

31 Base class for NN modules that are to support MC-Dropout. 

32 Support can be added by applying the _dropout function in the module's forward method. 

33 Then, to apply inference that samples results, call inferMCDropout rather than just using __call__. 

34 """ 

35 

36 def __init__(self) -> None: 

37 super().__init__() 

38 self._applyMCDropout = False 

39 self._pMCDropoutOverride = None 

40 

41 def __setstate__(self, d: dict) -> None: 

42 if "_applyMCDropout" not in d: 

43 d["_applyMCDropout"] = False 

44 if "_pMCDropoutOverride" not in d: 

45 d["_pMCDropoutOverride"] = None 

46 super().__setstate__(d) 

47 

48 def _dropout(self, x: torch.Tensor, p_training=None, p_inference=None) -> torch.Tensor: 

49 """ 

50 This method is to to applied within the module's forward method to apply dropouts during training and/or inference. 

51 

52 :param x: the model input tensor 

53 :param p_training: the probability with which to apply dropouts during training; if None, apply no dropout 

54 :param p_inference: the probability with which to apply dropouts during MC-Dropout-based inference (via inferMCDropout, 

55 which may override the probability via its optional argument); 

56 if None, a dropout is not to be applied 

57 :return: a potentially modified version of x with some elements dropped out, depending on application context and dropout 

58 probabilities 

59 """ 

60 if self.training and p_training is not None: 

61 return F.dropout(x, p_training) 

62 elif not self.training and self._applyMCDropout and p_inference is not None: 

63 return F.dropout(x, p_inference if self._pMCDropoutOverride is None else self._pMCDropoutOverride) 

64 else: 

65 return x 

66 

67 def _enable_mc_dropout(self, enabled=True, p_mc_dropout_override=None) -> None: 

68 self._applyMCDropout = enabled 

69 self._pMCDropoutOverride = p_mc_dropout_override 

70 

71 def infer_mc_dropout(self, x: Union[torch.Tensor, Sequence[torch.Tensor]], num_samples, p=None) -> Tuple[torch.Tensor, torch.Tensor]: 

72 """ 

73 Applies inference using MC-Dropout, drawing the given number of samples. 

74 

75 :param x: the model input (a tensor or tuple/list of tensors) 

76 :param num_samples: the number of samples to draw with MC-Dropout 

77 :param p: the dropout probability to apply, overriding the probability specified by the model's forward method; if None, use model's 

78 default 

79 :return: a pair (y, sd) where y the mean output tensor and sd is a tensor of the same dimension containing standard deviations 

80 """ 

81 if type(x) not in (tuple, list): 

82 x = [x] 

83 results = [] 

84 self._enable_mc_dropout(True, p_mc_dropout_override=p) 

85 try: 

86 for i in range(num_samples): 

87 y = self(*x) 

88 results.append(y) 

89 finally: 

90 self._enable_mc_dropout(False) 

91 results = torch.stack(results) 

92 mean = torch.mean(results, 0) 

93 stddev = torch.std(results, 0, unbiased=False) 

94 return mean, stddev 

95 

96 

97class TorchModel(ABC, ToStringMixin): 

98 """ 

99 sensAI abstraction for torch models, which supports one-line training, allows for convenient model application, 

100 has basic mechanisms for data scaling, and soundly handles persistence (via pickle). 

101 An instance wraps a torch.nn.Module, which is constructed on demand during training via the factory method 

102 createTorchModule. 

103 """ 

104 log: logging.Logger = log.getChild(__qualname__) 

105 

106 def __init__(self, cuda=True) -> None: 

107 self.cuda: bool = cuda 

108 self.module: Optional[torch.nn.Module] = None 

109 self.outputScaler: Optional[TensorScaler] = None 

110 self.inputScaler: Optional[TensorScaler] = None 

111 self.trainingInfo: Optional[TrainingInfo] = None 

112 self._gpu: Optional[int] = None 

113 self._normalisationCheckThreshold: Optional[int] = 5 

114 

115 def _tostring_exclude_private(self) -> bool: 

116 return True 

117 

118 def set_torch_module(self, module: torch.nn.Module) -> None: 

119 self.module = module 

120 

121 def set_normalisation_check_threshold(self, threshold: Optional[float]): 

122 self._normalisationCheckThreshold = threshold 

123 

124 def get_module_bytes(self) -> bytes: 

125 bytes_io = io.BytesIO() 

126 torch.save(self.module, bytes_io) 

127 return bytes_io.getvalue() 

128 

129 def set_module_bytes(self, model_bytes: bytes) -> None: 

130 model_file = io.BytesIO(model_bytes) 

131 self._load_model(model_file) 

132 

133 def get_torch_module(self) -> torch.nn.Module: 

134 return self.module 

135 

136 def _set_cuda_enabled(self, is_cuda_enabled: bool) -> None: 

137 self.cuda = is_cuda_enabled 

138 

139 def _is_cuda_enabled(self) -> bool: 

140 return self.cuda 

141 

142 def _load_model(self, model_file) -> None: # TODO: complete type hints: what types are allowed for modelFile? 

143 try: 

144 self.module = torch.load(model_file) 

145 self._gpu = self._get_gpu_from_model_parameter_device() 

146 except: 

147 if self._is_cuda_enabled(): 

148 if torch.cuda.device_count() > 0: 

149 new_device = "cuda:0" 

150 else: 

151 new_device = "cpu" 

152 self.log.warning(f"Loading of CUDA model failed, trying to map model to device {new_device}...") 

153 if type(model_file) != str: 

154 model_file.seek(0) 

155 try: 

156 self.module = torch.load(model_file, map_location=new_device) 

157 except: 

158 self.log.warning(f"Failure to map model to device {new_device}, trying CPU...") 

159 if new_device != "cpu": 

160 new_device = "cpu" 

161 self.module = torch.load(model_file, map_location=new_device) 

162 if new_device == "cpu": 

163 self._set_cuda_enabled(False) 

164 self._gpu = None 

165 else: 

166 self._gpu = 0 

167 self.log.info(f"Model successfully loaded to {new_device}") 

168 else: 

169 raise 

170 

171 @abstractmethod 

172 def create_torch_module(self) -> torch.nn.Module: 

173 pass 

174 

175 def __getstate__(self) -> dict: 

176 state = dict(self.__dict__) 

177 del state["module"] 

178 state["modelBytes"] = self.get_module_bytes() 

179 return state 

180 

181 def __setstate__(self, d: dict) -> None: 

182 # backward compatibility 

183 if "bestEpoch" in d: 

184 d["trainingInfo"] = TrainingInfo(best_epoch=d["bestEpoch"]) 

185 del d["bestEpoch"] 

186 new_default_properties = {"_normalisationCheckThreshold": 5} 

187 

188 model_bytes = None 

189 if "modelBytes" in d: 

190 model_bytes = d["modelBytes"] 

191 del d["modelBytes"] 

192 setstate(TorchModel, self, d, new_default_properties=new_default_properties) 

193 if model_bytes is not None: 

194 self.set_module_bytes(model_bytes) 

195 

196 def apply(self, 

197 x: Union[torch.Tensor, np.ndarray, TorchDataSet, Sequence[torch.Tensor]], 

198 as_numpy: bool = True, create_batch: bool = False, 

199 mc_dropout_samples: Optional[int] = None, 

200 mc_dropout_probability: Optional[float] = None, 

201 scale_output: bool = False, 

202 scale_input: bool = False) -> Union[torch.Tensor, np.ndarray, Tuple]: 

203 """ 

204 Applies the model to the given input tensor and returns the result 

205 

206 :param x: the input tensor (either a batch or, if createBatch=True, a single data point), a data set or a tuple/list of tensors 

207 (if the model accepts more than one input). 

208 If it is a data set, it will be processed at once, so the data set must not be too large to be processed at once. 

209 :param as_numpy: flag indicating whether to convert the result to a numpy.array (if False, return tensor) 

210 :param create_batch: whether to add an additional tensor dimension for a batch containing just one data point 

211 :param mc_dropout_samples: if not None, apply MC-Dropout-based inference with the respective number of samples; if None, apply 

212 regular inference 

213 :param mc_dropout_probability: the probability with which to apply dropouts in MC-Dropout-based inference; if None, use model's 

214 default 

215 :param scale_output: whether to scale the output that is produced by the underlying model (using this instance's output scaler, 

216 if any) 

217 :param scale_input: whether to scale the input (using this instance's input scaler, if any) before applying the underlying model 

218 

219 :return: an output tensor or, if MC-Dropout is applied, a pair (y, sd) where y the mean output tensor and sd is a tensor of the 

220 same dimension containing standard deviations 

221 """ 

222 def extract(z): 

223 if scale_output: 

224 z = self.scaled_output(z) 

225 if self._is_cuda_enabled(): 

226 z = z.cpu() 

227 z = z.detach() 

228 if as_numpy: 

229 z = z.numpy() 

230 return z 

231 

232 model = self.get_torch_module() 

233 model.eval() 

234 

235 if isinstance(x, TorchDataSet): 

236 x = next(x.iter_batches(x.size(), input_only=True, shuffle=False)) 

237 elif isinstance(x, np.ndarray): 

238 x = to_float_array(x) 

239 x = torch.from_numpy(x).float() 

240 

241 if type(x) not in (list, tuple): 

242 inputs = [x] 

243 else: 

244 inputs = x 

245 

246 if self._is_cuda_enabled(): 

247 torch.cuda.set_device(self._gpu) 

248 inputs = [t.cuda() for t in inputs] 

249 if scale_input: 

250 inputs = [self.inputScaler.normalise(t) for t in inputs] 

251 if create_batch: 

252 inputs = [t.view(1, *x.size()) for t in inputs] 

253 

254 # check input normalisation 

255 if self._normalisationCheckThreshold is not None: 

256 for i, t in enumerate(inputs): 

257 if t.is_floating_point() and t.numel() > 0: # skip any integer tensors (which typically contain lengths) and empty tensors 

258 max_value = t.abs().max().item() 

259 if max_value > self._normalisationCheckThreshold: 

260 log.warning(f"Received value in input tensor {i} which is likely to not be correctly normalised: " 

261 f"maximum abs. value in tensor is %f" % max_value) 

262 if mc_dropout_samples is None: 

263 y = model(*inputs) 

264 return extract(y) 

265 else: 

266 y, stddev = model.inferMCDropout(x, mc_dropout_samples, p=mc_dropout_probability) 

267 return extract(y), extract(stddev) 

268 

269 def apply_scaled(self, x: Union[torch.Tensor, np.ndarray, TorchDataSet, Sequence[torch.Tensor]], 

270 as_numpy: bool = True, 

271 create_batch: bool = False, 

272 mc_dropout_samples: Optional[int] = None, 

273 mc_dropout_probability: Optional[float] = None) \ 

274 -> Union[torch.Tensor, np.ndarray]: 

275 """ 

276 applies the model to the given input tensor and returns the scaled result (i.e. in the original scale) 

277 

278 :param x: the input tensor(s) or data set 

279 :param as_numpy: flag indicating whether to convert the result to a numpy.array (if False, return tensor) 

280 :param create_batch: whether to add an additional tensor dimension for a batch containing just one data point 

281 :param mc_dropout_samples: if not None, apply MC-Dropout-based inference with the respective number of samples; if None, apply 

282 regular inference 

283 :param mc_dropout_probability: the probability with which to apply dropouts in MC-Dropout-based inference; if None, use model's 

284 default 

285 

286 :return: a scaled output tensor or, if MC-Dropout is applied, a pair (y, sd) of scaled tensors, where 

287 y the mean output tensor and sd is a tensor of the same dimension containing standard deviations 

288 """ 

289 return self.apply(x, scale_output=True, scale_input=True, as_numpy=as_numpy, create_batch=create_batch, 

290 mc_dropout_samples=mc_dropout_samples, mc_dropout_probability=mc_dropout_probability) 

291 

292 def scaled_output(self, output: torch.Tensor) -> torch.Tensor: 

293 return self.outputScaler.denormalise(output) 

294 

295 def _extract_params_from_data(self, data: TorchDataSetProvider) -> None: 

296 self.outputScaler = data.get_output_tensor_scaler() 

297 self.inputScaler = data.get_input_tensor_scaler() 

298 

299 def fit(self, data: TorchDataSetProvider, nn_optimiser_params: NNOptimiserParams, strategy: "TorchModelFittingStrategy" = None) \ 

300 -> None: 

301 """ 

302 Fits this model using the given model and strategy 

303 

304 :param data: a provider for the data with which to fit the model 

305 :param strategy: the fitting strategy; if None, use TorchModelFittingStrategyDefault. 

306 Pass your own strategy to perform custom fitting processes, e.g. process which involve multi-stage learning 

307 :param nn_optimiser_params: the parameters with which to create an optimiser which can be applied in the fitting strategy 

308 """ 

309 self._extract_params_from_data(data) 

310 optimiser = NNOptimiser(nn_optimiser_params) 

311 if strategy is None: 

312 strategy = TorchModelFittingStrategyDefault() 

313 self.trainingInfo = strategy.fit(self, data, optimiser) 

314 self._gpu = self._get_gpu_from_model_parameter_device() 

315 

316 def _get_gpu_from_model_parameter_device(self) -> Optional[int]: 

317 try: 

318 return next(self.module.parameters()).get_device() 

319 except: 

320 return None 

321 

322 @property 

323 def best_epoch(self) -> Optional[int]: 

324 return self.trainingInfo.best_epoch if self.trainingInfo is not None else None 

325 

326 @property 

327 def total_epochs(self) -> Optional[int]: 

328 return self.trainingInfo.total_epochs if self.trainingInfo is not None else None 

329 

330 def _tostring_excludes(self) -> List[str]: 

331 return ['_gpu', 'module', 'trainingInfo', "inputScaler", "outputScaler"] 

332 

333 def _tostring_additional_entries(self): 

334 return dict(bestEpoch=self.best_epoch, totalEpochs=self.total_epochs) 

335 

336 

337class TorchModelFittingStrategy(ABC): 

338 """ 

339 Defines the interface for fitting strategies that can be used in TorchModel.fit 

340 """ 

341 @abstractmethod 

342 def fit(self, model: TorchModel, data: TorchDataSetProvider, nn_optimiser: NNOptimiser) -> Optional[TrainingInfo]: 

343 pass 

344 

345 

346class TorchModelFittingStrategyDefault(TorchModelFittingStrategy): 

347 """ 

348 Represents the default fitting strategy, which simply applies the given optimiser to the model and data 

349 """ 

350 def fit(self, model: TorchModel, data: TorchDataSetProvider, nn_optimiser: NNOptimiser) -> Optional[TrainingInfo]: 

351 return nn_optimiser.fit(model, data) 

352 

353 

354class TorchModelFromModuleFactory(TorchModel): 

355 def __init__(self, module_factory: Callable[..., torch.nn.Module], *args, cuda: bool = True, **kwargs) -> None: 

356 super().__init__(cuda) 

357 self.args = args 

358 self.kwargs = kwargs 

359 self.moduleFactory = module_factory 

360 

361 def create_torch_module(self) -> torch.nn.Module: 

362 return self.moduleFactory(*self.args, **self.kwargs) 

363 

364 

365class TorchModelFromModule(TorchModel): 

366 def __init__(self, module: torch.nn.Module, cuda: bool = True): 

367 super().__init__(cuda=cuda) 

368 self.module = module 

369 

370 def create_torch_module(self) -> torch.nn.Module: 

371 return self.module 

372 

373 

374class TorchModelFactoryFromModule: 

375 """Represents a factory for the creation of a TorchModel based on a torch module""" 

376 def __init__(self, module: torch.nn.Module, cuda: bool = True): 

377 self.module = module 

378 self.cuda = cuda 

379 

380 def __call__(self) -> TorchModel: 

381 return TorchModelFromModule(self.module, self.cuda) 

382 

383 

384class VectorTorchModel(TorchModel, ABC): 

385 """ 

386 Base class for TorchModels that can be used within VectorModels, where the input and output dimensions 

387 are determined by the data 

388 """ 

389 def __init__(self, cuda: bool = True) -> None: 

390 super().__init__(cuda=cuda) 

391 self.inputDim = None 

392 self.outputDim = None 

393 

394 def _extract_params_from_data(self, data: TorchDataSetProvider) -> None: 

395 super()._extract_params_from_data(data) 

396 self.inputDim = data.get_input_dim() 

397 self.outputDim = data.get_model_output_dim() 

398 

399 def create_torch_module(self) -> torch.nn.Module: 

400 return self.create_torch_module_for_dims(self.inputDim, self.outputDim) 

401 

402 @abstractmethod 

403 def create_torch_module_for_dims(self, input_dim: int, output_dim: int) -> torch.nn.Module: 

404 """ 

405 :param input_dim: the number of input dimensions as reported by the data set provider (number of columns 

406 in input data frame for default providers) 

407 :param output_dim: the number of output dimensions as reported by the data set provider (for default providers, 

408 this will be the number of columns in the output data frame or, for classification, the number of classes) 

409 :return: the torch module 

410 """ 

411 pass 

412 

413 

414class TorchAutoregressiveResultHandler(ABC): 

415 """ 

416 Supports the saving of predictions results such that subsequent predictions 

417 can build on earlier predictions, thus supporting autoregressive models. 

418 """ 

419 

420 @abstractmethod 

421 def clear_results(self): 

422 pass 

423 

424 @abstractmethod 

425 def save_results(self, input_df: pd.DataFrame, results: np.ndarray) -> None: 

426 """ 

427 Saves the regression results such that they can be used as input for subsequent prediction steps. 

428 The input will typically be processed by a feature generator or vectoriser, so the result 

429 should be stored in a place from which the respective feature generator or vectoriser can retrieve it. 

430 

431 :param input_df: the input data frame for which results were obtained (number of rows corresponds to 

432 length of `results`) 

433 :param results: the results array, which is typically a 2D array where `results[i]` is an array 

434 containing the results for the i-th input row 

435 """ 

436 pass 

437 

438 

439TTorchVectorRegressionModel = typing.TypeVar("TTorchVectorRegressionModel", bound="TorchVectorRegressionModel") 

440 

441 

442class TorchVectorRegressionModel(VectorRegressionModel): 

443 """ 

444 Base class for the implementation of VectorRegressionModels based on TorchModels. 

445 An instance of this class will have an instance of TorchModel as the underlying model. 

446 """ 

447 

448 def __init__(self, torch_model_factory: Callable[[], TorchModel], 

449 normalisation_mode: NormalisationMode = NormalisationMode.NONE, 

450 nn_optimiser_params: Union[dict, NNOptimiserParams, None] = None) -> None: 

451 """ 

452 :param torch_model_factory: the factory function with which to create the contained TorchModel instance that the instance is to 

453 encapsulate. For the instance to be picklable, this cannot be a lambda or locally defined function. 

454 :param normalisation_mode: the normalisation mode to apply to input data frames 

455 :param nn_optimiser_params: the parameters to apply in NNOptimiser during training 

456 """ 

457 super().__init__() 

458 

459 if nn_optimiser_params is None: 

460 nn_optimiser_params_instance = NNOptimiserParams() 

461 else: 

462 nn_optimiser_params_instance = NNOptimiserParams.from_dict_or_instance(nn_optimiser_params) 

463 if nn_optimiser_params_instance.loss_evaluator is None: 

464 nn_optimiser_params_instance.loss_evaluator = NNLossEvaluatorRegression(NNLossEvaluatorRegression.LossFunction.MSELOSS) 

465 

466 self.torch_model_factory = torch_model_factory 

467 self.normalisationMode = normalisation_mode 

468 self.nnOptimiserParams = nn_optimiser_params_instance 

469 self.model: Optional[TorchModel] = None 

470 self.inputTensoriser: Optional[Tensoriser] = None 

471 self.outputTensoriser: Optional[Tensoriser] = None 

472 self.outputTensorToArrayConverter: Optional[OutputTensorToArrayConverter] = None 

473 self.torchDataSetProviderFactory: Optional[TorchDataSetProviderFactory] = None 

474 self.dataFrameSplitter: Optional[DataFrameSplitter] = None 

475 self._normalisationCheckThreshold = 5 

476 self.inferenceBatchSize: Optional[int] = None 

477 self.autoregressiveResultHandler: Optional[TorchAutoregressiveResultHandler] = None 

478 

479 def __setstate__(self, state) -> None: 

480 if "modelClass" in state: # old-style factory 

481 state["torch_model_factory"] = functools.partial(state["modelClass"], *state["modelArgs"], **state["modelKwArgs"]) 

482 for k in ("modelClass", "modelArgs", "modelKwArgs"): 

483 del state[k] 

484 state["nnOptimiserParams"] = NNOptimiserParams.from_dict_or_instance(state["nnOptimiserParams"]) 

485 new_optional_members = ["inputTensoriser", "torchDataSetProviderFactory", "dataFrameSplitter", "outputTensoriser", 

486 "outputTensorToArrayConverter", "inferenceBatchSize", "autoRegressiveResultHandler"] 

487 new_default_properties = {"_normalisationCheckThreshold": 5} 

488 setstate(TorchVectorRegressionModel, self, state, new_optional_properties=new_optional_members, 

489 new_default_properties=new_default_properties) 

490 

491 @classmethod 

492 def from_module(cls, module: torch.nn.Module, cuda=True, normalisation_mode: NormalisationMode = NormalisationMode.NONE, 

493 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> "TorchVectorRegressionModel": 

494 return cls(TorchModelFactoryFromModule(module=module, cuda=cuda), normalisation_mode=normalisation_mode, 

495 nn_optimiser_params=nn_optimiser_params) 

496 

497 def _tostring_excludes(self) -> List[str]: 

498 excludes = super()._tostring_excludes() 

499 if self.model is not None: 

500 return excludes + ["modelClass", "modelArgs", "modelKwArgs"] 

501 else: 

502 return excludes 

503 

504 def with_input_tensoriser(self: TTorchVectorRegressionModel, tensoriser: Tensoriser) -> TTorchVectorRegressionModel: 

505 """ 

506 :param tensoriser: tensoriser to use in order to convert input data frames to (one or more) tensors. 

507 The default tensoriser directly converts the data frame's values (which is assumed to contain only scalars that 

508 can be coerced to floats) to a float tensor. 

509 The use of a custom tensoriser is necessary if a non-trivial conversion is necessary or if the data frame 

510 is to be converted to more than one input tensor. 

511 :return: self 

512 """ 

513 self.inputTensoriser = tensoriser 

514 return self 

515 

516 def with_output_tensoriser(self: TTorchVectorRegressionModel, tensoriser: RuleBasedTensoriser) -> TTorchVectorRegressionModel: 

517 """ 

518 :param tensoriser: tensoriser to use in order to convert the output data frame to a tensor. 

519 The default output tensoriser directly converts the data frame's values to a float tensor. 

520 

521 NOTE: It is required to be a rule-based tensoriser, because mechanisms that require fitting on the data 

522 and thus perform a data-dependendent conversion are likely to cause problems because they would need 

523 to be reversed at inference time (since the model will be trained on the converted values). If you require 

524 a transformation, use a target transformer, which will be applied before the tensoriser. 

525 :return: self 

526 """ 

527 self.outputTensoriser = tensoriser 

528 return self 

529 

530 def with_output_tensor_to_array_converter(self: TTorchVectorRegressionModel, 

531 output_tensor_to_array_converter: "OutputTensorToArrayConverter") -> TTorchVectorRegressionModel: 

532 """ 

533 Configures the use of a custom converter from tensors to numpy arrays, which is applied during inference. 

534 A custom converter can be required, for example, to handle variable-length outputs (where the output tensor 

535 will typically contain unwanted padding). Note that since the converter is for inference only, it may be 

536 required to use a custom loss evaluator during training if the use of a custom converter is necessary. 

537 

538 :param output_tensor_to_array_converter: the converter 

539 :return: self 

540 """ 

541 self.outputTensorToArrayConverter = output_tensor_to_array_converter 

542 return self 

543 

544 def with_torch_data_set_provider_factory(self: TTorchVectorRegressionModel, 

545 torch_data_set_provider_factory: "TorchDataSetProviderFactory") -> TTorchVectorRegressionModel: 

546 """ 

547 :param torch_data_set_provider_factory: the torch data set provider factory, which is used to instantiate the provider which 

548 will provide the training and validation data sets from the input data frame that is passed in for learning. 

549 By default, TorchDataSetProviderFactoryRegressionDefault is used. 

550 :return: self 

551 """ 

552 self.torchDataSetProviderFactory = torch_data_set_provider_factory 

553 return self 

554 

555 def with_data_frame_splitter(self: TTorchVectorRegressionModel, data_frame_splitter: DataFrameSplitter) -> TTorchVectorRegressionModel: 

556 """ 

557 :param data_frame_splitter: the data frame splitter which is used to split the input/output data frames that are passed for 

558 learning into a data frame that is used for training and a data frame that is used for validation. 

559 The input data frame is the data frame that is passed as input to the splitter, and the returned indices 

560 are used to split both the input and output data frames in the same way. 

561 :return: self 

562 """ 

563 self.dataFrameSplitter = data_frame_splitter 

564 return self 

565 

566 def with_normalisation_check_threshold(self: TTorchVectorRegressionModel, threshold: Optional[float]) -> TTorchVectorRegressionModel: 

567 """ 

568 Defines a threshold with which to check inputs that are passed to the underlying neural network. 

569 Whenever an (absolute) input value exceeds the threshold, a warning is triggered. 

570 

571 :param threshold: the threshold 

572 :return: self 

573 """ 

574 self._normalisationCheckThreshold = threshold 

575 return self 

576 

577 def with_autoregressive_result_handler(self: TTorchVectorRegressionModel, 

578 result_handler: TorchAutoregressiveResultHandler, 

579 inference_batch_size=1) -> TTorchVectorRegressionModel: 

580 """ 

581 Adds a result handler which can be used to store prediction results such that subsequent predictions 

582 can use the prediction result, supporting autoregressive models. 

583 The autoregressive predictions are assumed to be handled in a single call to method :meth:`predict`, 

584 and the results will be stored for the duration of the call. 

585 For autoregressive predictions that build on earlier predictions, we must typically restrict 

586 the batch size such that predictions from the earlier batch can be saved and correctly reused 

587 as input for the subsequent predictions. The models input preprocessors (such as feature generators 

588 or vectorisers) must make ensure that the results being stored by the result handler are appropriately 

589 used as input. 

590 

591 :param result_handler: the result handler 

592 :param inference_batch_size: the batch size to use for predictions 

593 :return: self 

594 """ 

595 self.autoregressiveResultHandler = result_handler 

596 self.inferenceBatchSize = inference_batch_size 

597 return self 

598 

599 def _create_torch_model(self) -> TorchModel: 

600 torch_model = self.torch_model_factory() 

601 torch_model.set_normalisation_check_threshold(self._normalisationCheckThreshold) 

602 return torch_model 

603 

604 def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> TorchDataSetProvider: 

605 factory = self.torchDataSetProviderFactory 

606 if factory is None: 

607 factory = TorchDataSetProviderFactoryRegressionDefault() 

608 return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser, 

609 output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter) 

610 

611 def _fit(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> None: 

612 if self.inputTensoriser is not None: 

613 log.info(f"Fitting {self.inputTensoriser} ...") 

614 self.inputTensoriser.fit(inputs, model=self) 

615 self.model = self._create_torch_model() 

616 data_set_provider = self._create_data_set_provider(inputs, outputs) 

617 self.model.fit(data_set_provider, self.nnOptimiserParams) 

618 

619 def _predict_outputs_for_input_data_frame(self, inputs: pd.DataFrame) -> np.ndarray: 

620 tensorise_dynamically = False 

621 if self.autoregressiveResultHandler is not None: 

622 self.autoregressiveResultHandler.clear_results() 

623 tensorise_dynamically = True # must be dynamically tensorised to allow inclusion of predicted results 

624 batch_size = self.nnOptimiserParams.batch_size if self.inferenceBatchSize is None else self.inferenceBatchSize 

625 results: List[np.ndarray] = [] 

626 data_set = TorchDataSetFromDataFrames(inputs, None, self.model.cuda, input_tensoriser=self.inputTensoriser, 

627 tensorise_dynamically=tensorise_dynamically) 

628 start_idx = 0 

629 for input_batch in data_set.iter_batches(batch_size, input_only=True): 

630 if self.outputTensorToArrayConverter is None: 

631 result = self.model.apply_scaled(input_batch, as_numpy=True) 

632 else: 

633 output_batch = self.model.apply_scaled(input_batch, as_numpy=False) 

634 result = self.outputTensorToArrayConverter.convert(output_batch, input_batch) 

635 if self.autoregressiveResultHandler is not None: 

636 self.autoregressiveResultHandler.save_results(inputs.iloc[start_idx:start_idx+len(result)], result) 

637 start_idx += len(result) 

638 results.append(result) 

639 if self.autoregressiveResultHandler is not None: 

640 self.autoregressiveResultHandler.clear_results() 

641 return np.concatenate(results) 

642 

643 def _predict(self, inputs: pd.DataFrame) -> pd.DataFrame: 

644 y_array = self._predict_outputs_for_input_data_frame(inputs) 

645 return pd.DataFrame(y_array, columns=self.get_model_output_variable_names()) 

646 

647 

648TTorchVectorClassificationModel = typing.TypeVar("TTorchVectorClassificationModel", bound="TorchVectorClassificationModel") 

649 

650 

651class TorchVectorClassificationModel(VectorClassificationModel): 

652 """ 

653 Base class for the implementation of VectorClassificationModels based on TorchModels. 

654 An instance of this class will have an instance of TorchModel as the underlying model. 

655 """ 

656 def __init__(self, output_mode: ClassificationOutputMode, 

657 torch_model_factory: Callable[[], TorchModel], 

658 normalisation_mode: NormalisationMode = NormalisationMode.NONE, 

659 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> None: 

660 """ 

661 :param output_mode: specifies the nature of the output of the underlying neural network model 

662 :param torch_model_factory: the factory function with which to create the contained TorchModel instance that the instance is to 

663 encapsulate. For the instance to be picklable, this cannot be a lambda or locally defined function. 

664 :param normalisation_mode: the normalisation mode to apply to input data frames 

665 :param nn_optimiser_params: the parameters to apply in NNOptimiser during training 

666 """ 

667 super().__init__() 

668 

669 if nn_optimiser_params is None: 

670 nn_optimiser_params = NNOptimiserParams() 

671 if nn_optimiser_params.loss_evaluator is None: 

672 loss_function = NNLossEvaluatorClassification.LossFunction.default_for_output_mode(output_mode) 

673 nn_optimiser_params.loss_evaluator = NNLossEvaluatorClassification(loss_function) 

674 

675 self.outputMode = output_mode 

676 self.torch_model_factory = torch_model_factory 

677 self.normalisationMode = normalisation_mode 

678 self.nnOptimiserParams: NNOptimiserParams = nn_optimiser_params 

679 self.model: Optional[TorchModel] = None 

680 self.inputTensoriser: Optional[Tensoriser] = None 

681 self.outputTensoriser: Optional[Tensoriser] = None 

682 self.torchDataSetProviderFactory: Optional[TorchDataSetProviderFactory] = None 

683 self.dataFrameSplitter: Optional[DataFrameSplitter] = None 

684 self._normalisationCheckThreshold = 5 

685 

686 # noinspection DuplicatedCode 

687 def __setstate__(self, state) -> None: 

688 if "modelClass" in state: # old-style factory 

689 state["torch_model_factory"] = functools.partial(state["modelClass"], *state["modelArgs"], **state["modelKwArgs"]) 

690 for k in ("modelClass", "modelArgs", "modelKwArgs"): 

691 del state[k] 

692 state["nnOptimiserParams"] = NNOptimiserParams.from_dict_or_instance(state["nnOptimiserParams"]) 

693 new_optional_members = ["inputTensoriser", "torchDataSetProviderFactory", "dataFrameSplitter", "outputTensoriser"] 

694 new_default_properties = {"outputMode": ClassificationOutputMode.PROBABILITIES, "_normalisationCheckThreshold": 5} 

695 setstate(TorchVectorClassificationModel, self, state, new_optional_properties=new_optional_members, 

696 new_default_properties=new_default_properties) 

697 

698 @classmethod 

699 def from_module(cls, module: torch.nn.Module, output_mode: ClassificationOutputMode, cuda=True, 

700 normalisation_mode: NormalisationMode = NormalisationMode.NONE, 

701 nn_optimiser_params: Optional[NNOptimiserParams] = None) -> "TorchVectorClassificationModel": 

702 return cls(output_mode, TorchModelFactoryFromModule(module, cuda=cuda), 

703 normalisation_mode=normalisation_mode, nn_optimiser_params=nn_optimiser_params) 

704 

705 def with_input_tensoriser(self: TTorchVectorClassificationModel, tensoriser: Tensoriser) -> TTorchVectorClassificationModel: 

706 """ 

707 :param tensoriser: tensoriser to use in order to convert input data frames to (one or more) tensors. 

708 The default tensoriser directly converts the data frame's values (which is assumed to contain only scalars that 

709 can be coerced to floats) to a float tensor. 

710 The use of a custom tensoriser is necessary if a non-trivial conversion is necessary or if the data frame 

711 is to be converted to more than one input tensor. 

712 :return: self 

713 """ 

714 self.inputTensoriser = tensoriser 

715 return self 

716 

717 def with_output_tensoriser(self: TTorchVectorClassificationModel, tensoriser: RuleBasedTensoriser) -> TTorchVectorClassificationModel: 

718 """ 

719 :param tensoriser: tensoriser to use in order to convert the output data frame to a tensor. 

720 NOTE: It is required to be a rule-based tensoriser, because mechanisms that require fitting on the data 

721 and thus perform a data-dependendent conversion are likely to cause problems because they would need 

722 to be reversed at inference time (since the model will be trained on the converted values). If you require 

723 a transformation, use a target transformer, which will be applied before the tensoriser. 

724 """ 

725 self.outputTensoriser = tensoriser 

726 return self 

727 

728 def with_torch_data_set_provider_factory(self: TTorchVectorClassificationModel, 

729 torch_data_set_provider_factory: "TorchDataSetProviderFactory") -> TTorchVectorClassificationModel: 

730 """ 

731 :param torch_data_set_provider_factory: the torch data set provider factory, which is used to instantiate the provider which 

732 will provide the training and validation data sets from the input data frame that is passed in for learning. 

733 By default, TorchDataSetProviderFactoryClassificationDefault is used. 

734 :return: self 

735 """ 

736 self.torchDataSetProviderFactory = torch_data_set_provider_factory 

737 return self 

738 

739 def with_data_frame_splitter(self: TTorchVectorClassificationModel, data_frame_splitter: DataFrameSplitter) \ 

740 -> TTorchVectorClassificationModel: 

741 """ 

742 :param data_frame_splitter: the data frame splitter which is used to split the input/output data frames that are passed for 

743 learning into a data frame that is used for training and a data frame that is used for validation. 

744 The input data frame is the data frame that is passed as input to the splitter, and the returned indices 

745 are used to split both the input and output data frames in the same way. 

746 :return: self 

747 """ 

748 self.dataFrameSplitter = data_frame_splitter 

749 return self 

750 

751 def with_normalisation_check_threshold(self: TTorchVectorClassificationModel, threshold: Optional[float]) \ 

752 -> TTorchVectorClassificationModel: 

753 """ 

754 Defines a threshold with which to check inputs that are passed to the underlying neural network. 

755 Whenever an (absolute) input value exceeds the threshold, a warning is triggered. 

756 

757 :param threshold: the threshold 

758 :return: self 

759 """ 

760 self._normalisationCheckThreshold = threshold 

761 return self 

762 

763 def _create_torch_model(self) -> TorchModel: 

764 torch_model = self.torch_model_factory() 

765 torch_model.set_normalisation_check_threshold(self._normalisationCheckThreshold) 

766 return torch_model 

767 

768 def _create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> TorchDataSetProvider: 

769 factory = self.torchDataSetProviderFactory 

770 if factory is None: 

771 factory = TorchDataSetProviderFactoryClassificationDefault() 

772 return factory.create_data_set_provider(inputs, outputs, self, self._trainingContext, input_tensoriser=self.inputTensoriser, 

773 output_tensoriser=self.outputTensoriser, data_frame_splitter=self.dataFrameSplitter) 

774 

775 def _fit_classifier(self, inputs: pd.DataFrame, outputs: pd.DataFrame) -> None: 

776 if len(outputs.columns) != 1: 

777 raise ValueError("Expected one output dimension: the class labels") 

778 

779 if self.inputTensoriser is not None: 

780 log.info(f"Fitting {self.inputTensoriser} ...") 

781 self.inputTensoriser.fit(inputs, model=self) 

782 

783 # transform outputs: for each data point, the new output shall be the index in the list of labels 

784 labels: pd.Series = outputs.iloc[:, 0] 

785 outputs = pd.DataFrame([self._labels.index(l) for l in labels], columns=outputs.columns, index=outputs.index) 

786 

787 self.model = self._create_torch_model() 

788 

789 data_set_provider = self._create_data_set_provider(inputs, outputs) 

790 self.model.fit(data_set_provider, self.nnOptimiserParams) 

791 

792 def _predict_outputs_for_input_data_frame(self, inputs: pd.DataFrame) -> torch.Tensor: 

793 batch_size = self.nnOptimiserParams.batch_size 

794 results = [] 

795 data_set = TorchDataSetFromDataFrames(inputs, None, self.model.cuda, input_tensoriser=self.inputTensoriser) 

796 for inputBatch in data_set.iter_batches(batch_size, input_only=True): 

797 results.append(self.model.apply_scaled(inputBatch, as_numpy=False)) 

798 return torch.cat(results, dim=0) 

799 

800 def _predict_class_probabilities(self, inputs: pd.DataFrame) -> pd.DataFrame: 

801 y = self._predict_outputs_for_input_data_frame(inputs) 

802 if self.outputMode == ClassificationOutputMode.PROBABILITIES: 

803 pass 

804 elif self.outputMode == ClassificationOutputMode.LOG_PROBABILITIES: 

805 y = y.exp() 

806 elif self.outputMode == ClassificationOutputMode.UNNORMALISED_LOG_PROBABILITIES: 

807 y = y.softmax(dim=1) 

808 else: 

809 raise ValueError(f"Unhandled output mode {self.outputMode}") 

810 return pd.DataFrame(y.numpy(), columns=self._labels) 

811 

812 def _tostring_excludes(self) -> List[str]: 

813 excludes = super()._tostring_excludes() 

814 if self.model is not None: 

815 return excludes + ["modelClass", "modelArgs", "modelKwArgs"] 

816 else: 

817 return excludes 

818 

819 

820class TorchDataSetProviderFactory(ABC): 

821 @abstractmethod 

822 def create_data_set_provider(self, 

823 inputs: pd.DataFrame, 

824 outputs: pd.DataFrame, 

825 model: Union[TorchVectorRegressionModel, TorchVectorClassificationModel], 

826 training_context: TrainingContext, 

827 input_tensoriser: Optional[Tensoriser], 

828 output_tensoriser: Optional[Tensoriser], 

829 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider: 

830 pass 

831 

832 

833class TorchDataSetProviderFactoryClassificationDefault(TorchDataSetProviderFactory): 

834 def __init__(self, tensorise_dynamically=False): 

835 """ 

836 :param tensorise_dynamically: whether tensorisation shall take place on the fly whenever the provided data sets are iterated; 

837 if False, tensorisation takes place once in a precomputation stage (tensors must jointly fit into memory) 

838 """ 

839 self.tensoriseDynamically = tensorise_dynamically 

840 

841 def create_data_set_provider(self, 

842 inputs: pd.DataFrame, 

843 outputs: pd.DataFrame, 

844 model: TorchVectorClassificationModel, 

845 training_context: TrainingContext, 

846 input_tensoriser: Optional[Tensoriser], 

847 output_tensoriser: Optional[Tensoriser], 

848 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider: 

849 data_util = ClassificationVectorDataUtil(inputs, outputs, model.model.cuda, len(model._labels), # TODO FIXME 

850 normalisation_mode=model.normalisationMode, input_tensoriser=input_tensoriser, output_tensoriser=output_tensoriser, 

851 data_frame_splitter=data_frame_splitter) 

852 return TorchDataSetProviderFromVectorDataUtil(data_util, model.model.cuda, tensorise_dynamically=self.tensoriseDynamically) 

853 

854 

855class TorchDataSetProviderFactoryRegressionDefault(TorchDataSetProviderFactory): 

856 def __init__(self, tensorise_dynamically=False): 

857 """ 

858 :param tensorise_dynamically: whether tensorisation shall take place on the fly whenever the provided data sets are iterated; 

859 if False, tensorisation takes place once in a precomputation stage (tensors must jointly fit into memory) 

860 """ 

861 self.tensoriseDynamically = tensorise_dynamically 

862 

863 def create_data_set_provider(self, inputs: pd.DataFrame, outputs: pd.DataFrame, model: TorchVectorRegressionModel, 

864 training_context: TrainingContext, input_tensoriser: Optional[Tensoriser], output_tensoriser: Optional[Tensoriser], 

865 data_frame_splitter: Optional[DataFrameSplitter]) -> TorchDataSetProvider: 

866 data_util = VectorDataUtil(inputs, outputs, model.model.cuda, normalisation_mode=model.normalisationMode, 

867 input_tensoriser=input_tensoriser, output_tensoriser=output_tensoriser, data_frame_splitter=data_frame_splitter) 

868 return TorchDataSetProviderFromVectorDataUtil(data_util, model.model.cuda, tensorise_dynamically=self.tensoriseDynamically) 

869 

870 

871class OutputTensorToArrayConverter(ABC): 

872 @abstractmethod 

873 def convert(self, model_output: torch.Tensor, model_input: Union[torch.Tensor, Sequence[torch.Tensor]]) -> np.ndarray: 

874 """ 

875 :param model_output: the output tensor generated by the model 

876 :param model_input: the input tensor(s) for which the model produced the output (which may provide relevant meta-data) 

877 :return: a numpy array of shape (N, D) where N=output.shape[0] is the number of data points and D is the number of 

878 variables predicted by the model  

879 """ 

880 pass