Source code for sensai.torch.torch_opt

import enum
import functools
import logging
import math
import time
from abc import ABC, abstractmethod
from collections import OrderedDict
from enum import Enum
from typing import List, Union, Sequence, Callable, TYPE_CHECKING, Tuple, Optional, Dict, Any

import matplotlib.figure
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from matplotlib import pyplot as plt
from torch import cuda as torchcuda

from .torch_data import TensorScaler, DataUtil, TorchDataSet, TorchDataSetProviderFromDataUtil, TorchDataSetProvider, \
    TensorScalerIdentity, TensorTransformer
from .torch_enums import ClassificationOutputMode
from ..util.string import ToStringMixin

if TYPE_CHECKING:
    from .torch_base import TorchModel

log = logging.getLogger(__name__)


[docs]class Optimiser(enum.Enum): SGD = ("sgd", optim.SGD) ASGD = ("asgd", optim.ASGD) ADAGRAD = ("adagrad", optim.Adagrad) ADADELTA = ("adadelta", optim.Adadelta) ADAM = ("adam", optim.Adam) ADAMW = ("adamw", optim.AdamW) ADAMAX = ("adamax", optim.Adamax) RMSPROP = ("rmsprop", optim.RMSprop) RPROP = ("rprop", optim.Rprop) LBFGS = ("lbfgs", optim.LBFGS)
[docs] @classmethod def from_name(cls, name: str) -> "Optimiser": lname = name.lower() for o in cls: if o.value[0] == lname: return o raise ValueError(f"Unknown optimiser name '{name}'; known names: {[o.value[0] for o in cls]}")
[docs] @classmethod def from_name_or_instance(cls, name_or_instance: Union[str, "Optimiser"]) -> "Optimiser": if type(name_or_instance) == str: return cls.from_name(name_or_instance) else: return name_or_instance
class _Optimiser(object): """ Wrapper for classes inherited from torch.optim.Optimizer """ def __init__(self, params, method: Union[str, Optimiser], lr, max_grad_norm, use_shrinkage=True, **optimiser_args): """ :param params: an iterable of torch.Tensor s or dict s. Specifies what Tensors should be optimized. :param method: the optimiser to use :param lr: learnig rate :param max_grad_norm: gradient norm value beyond which to apply gradient shrinkage :param optimiser_args: keyword arguments to be used in actual torch optimiser """ self.method = Optimiser.from_name_or_instance(method) self.params = list(params) # careful: params may be a generator self.last_ppl = None self.lr = lr self.max_grad_norm = max_grad_norm self.start_decay = False self.optimiserArgs = optimiser_args self.use_shrinkage = use_shrinkage # instantiate optimiser optimiser_args = dict(self.optimiserArgs) optimiser_args.update({'lr': self.lr}) if self.method == Optimiser.LBFGS: self.use_shrinkage = False self.optimizer = optim.LBFGS(self.params, **optimiser_args) else: cons = self.method.value[1] self.optimizer = cons(self.params, **optimiser_args) def step(self, loss_backward: Callable): """ :param loss_backward: callable, performs backward step and returns loss :return: loss value """ if self.use_shrinkage: def closure_with_shrinkage(): loss_value = loss_backward() torch.nn.utils.clip_grad_norm_(self.params, self.max_grad_norm) return loss_value closure = closure_with_shrinkage else: closure = loss_backward loss = self.optimizer.step(closure) return loss
[docs]class NNLossEvaluator(ABC): """ Base class defining the interface for training and validation loss evaluation. """
[docs] class Evaluation(ABC):
[docs] @abstractmethod def start_epoch(self) -> None: """ Starts a new epoch, resetting any aggregated values required to ultimately return the epoch's overall training loss (via getEpochTrainLoss) and validation metrics (via getValidationMetrics) """ pass
[docs] @abstractmethod def compute_train_batch_loss(self, model_output, ground_truth, x, y) -> torch.Tensor: """ Computes the loss for the given model outputs and ground truth values for a batch and aggregates the computed loss values such that :meth:``getEpochTrainLoss`` can return an appropriate result for the entire epoch. The original batch tensors X and Y are provided as meta-information only. :param model_output: the model output :param ground_truth: the ground truth values :param x: the original batch input tensor :param y: the original batch output (ground truth) tensor :return: the loss (scalar tensor) """ pass
[docs] @abstractmethod def get_epoch_train_loss(self) -> float: """ :return: the epoch's overall training loss (as obtained by collecting data from individual training batch data passed to computeTrainBatchLoss) """ pass
[docs] @abstractmethod def process_validation_batch(self, model_output, ground_truth, x, y) -> None: """ Processes the given model outputs and ground truth values in order to compute sufficient statistics for velidation metrics, which at the end of the epoch, shall be retrievable via method getValidationMetrics :param model_output: the model output :param ground_truth: the ground truth values :param x: the original batch input tensor :param y: the original batch output (ground truth) tensor :return: the loss (scalar tensor) """ pass
[docs] @abstractmethod def get_validation_metrics(self) -> Dict[str, float]: pass
[docs] @abstractmethod def start_evaluation(self, cuda: bool) -> Evaluation: """ Begins the evaluation of a model, returning a (stateful) object which is to perform the necessary computations. :param cuda: whether CUDA is being applied (all tensors/models on the GPU) :return: the evaluation object """ pass
[docs] @abstractmethod def get_validation_metric_name(self) -> str: """ :return: the name of the validation metric which is to be used to determine the best model (key for the ordered dictionary returned by method Evaluation.getValidationMetrics) """ pass
[docs]class NNLossEvaluatorFixedDim(NNLossEvaluator, ABC): """ Base class defining the interface for training and validation loss evaluation, which uses fixed-dimension outputs and aggregates individual training batch losses that are summed losses per batch (averaging appropriately internally). """
[docs] class Evaluation(NNLossEvaluator.Evaluation): def __init__(self, criterion, validation_loss_evaluator: "NNLossEvaluatorFixedDim.ValidationLossEvaluator", output_dim_weights: torch.Tensor = None): self.output_dim_weights = output_dim_weights self.output_dim_weight_sum = torch.sum(output_dim_weights) if output_dim_weights is not None else None self.validation_loss_evaluator = validation_loss_evaluator self.criterion = criterion self.total_loss = None self.num_samples = None self.num_outputs_per_data_point: Optional[int] = None self.validation_ground_truth_shape = None
[docs] def start_epoch(self): self.total_loss = 0 self.num_samples = 0 self.validation_ground_truth_shape = None
[docs] def compute_train_batch_loss(self, model_output, ground_truth, x, y) -> torch.Tensor: # size of modelOutput and groundTruth: (batchSize, outputDim=numOutputsPerDataPoint) if self.num_outputs_per_data_point is None: output_shape = y.shape[1:] self.num_outputs_per_data_point = functools.reduce(lambda x, y: x * y, output_shape, 1) assert self.output_dim_weights is None or len(self.output_dim_weights) == self.num_outputs_per_data_point num_data_points_in_batch = y.shape[0] if self.output_dim_weights is None: # treat all dimensions as equal, applying criterion to entire tensors loss = self.criterion(model_output, ground_truth) self.num_samples += num_data_points_in_batch * self.num_outputs_per_data_point self.total_loss += loss.item() return loss else: # compute loss per dimension and return weighted loss loss_per_dim = torch.zeros(self.num_outputs_per_data_point, device=model_output.device, dtype=torch.float) for o in range(self.num_outputs_per_data_point): loss_per_dim[o] = self.criterion(model_output[:, o], ground_truth[:, o]) weighted_loss = (loss_per_dim * self.output_dim_weights).sum() / self.output_dim_weight_sum self.num_samples += num_data_points_in_batch self.total_loss += weighted_loss.item() return weighted_loss
[docs] def get_epoch_train_loss(self) -> float: return self.total_loss / self.num_samples
[docs] def process_validation_batch(self, model_output, ground_truth, x, y): if self.validation_ground_truth_shape is None: self.validation_ground_truth_shape = y.shape[1:] # the shape of the output of a single model application self.validation_loss_evaluator.start_validation_collection(self.validation_ground_truth_shape) self.validation_loss_evaluator.process_validation_result_batch(model_output, ground_truth)
[docs] def get_validation_metrics(self) -> Dict[str, float]: return self.validation_loss_evaluator.end_validation_collection()
[docs] def start_evaluation(self, cuda: bool) -> Evaluation: criterion = self.get_training_criterion() output_dim_weights_array = self.get_output_dim_weights() output_dim_weights_tensor = torch.from_numpy(output_dim_weights_array).float() if output_dim_weights_array is not None else None if cuda: criterion = criterion.cuda() if output_dim_weights_tensor is not None: output_dim_weights_tensor = output_dim_weights_tensor.cuda() return self.Evaluation(criterion, self.create_validation_loss_evaluator(cuda), output_dim_weights=output_dim_weights_tensor)
[docs] @abstractmethod def get_training_criterion(self) -> nn.Module: """ Gets the optimisation criterion (loss function) for training. Standard implementations are available in torch.nn (torch.nn.MSELoss, torch.nn.CrossEntropyLoss, etc.). """ pass
[docs] @abstractmethod def get_output_dim_weights(self) -> Optional[np.ndarray]: pass
[docs] @abstractmethod def create_validation_loss_evaluator(self, cuda: bool) -> "ValidationLossEvaluator": """ :param cuda: whether to use CUDA-based tensors :return: the evaluator instance which is to be used to evaluate the model on validation data """ pass
[docs] def get_validation_metric_name(self) -> str: """ Gets the name of the metric (key of dictionary as returned by the validation loss evaluator's endValidationCollection method), which is defining for the quality of the model and thus determines which epoch's model is considered the best. :return: the name of the metric """ pass
[docs] class ValidationLossEvaluator(ABC):
[docs] @abstractmethod def start_validation_collection(self, ground_truth_shape): """ Initiates validation data collection for a new epoch, appropriately resetting this object's internal state. :param ground_truth_shape: the tensor shape of a single ground truth data point (not including the batch entry dimension) """ pass
[docs] @abstractmethod def process_validation_result_batch(self, output, ground_truth): """ Collects, for validation, the given output and ground truth data (tensors holding data on one batch, where the first dimension is the batch entry) :param output: the model's output :param ground_truth: the corresponding ground truth """ pass
[docs] @abstractmethod def end_validation_collection(self) -> OrderedDict: """ Computes validation metrics based on the data previously processed. :return: an ordered dictionary with validation metrics """ pass
[docs]class NNLossEvaluatorRegression(NNLossEvaluatorFixedDim, ToStringMixin): """A loss evaluator for (multi-variate) regression."""
[docs] class LossFunction(Enum): L1LOSS = "L1Loss" L2LOSS = "L2Loss" MSELOSS = "MSELoss" SMOOTHL1LOSS = "SmoothL1Loss"
def __init__(self, loss_fn: LossFunction = LossFunction.L2LOSS, validation_tensor_transformer: Optional[TensorTransformer] = None, output_dim_weights: Sequence[float] = None, apply_output_dim_weights_in_validation=True, validation_metric_name: Optional[str] = None): """ :param loss_fn: the loss function to use :param validation_tensor_transformer: a transformer which is to be applied to validation tensors (both model outputs and ground truth) prior to computing the validation metrics :param output_dim_weights: vector of weights to apply to then mean loss per output dimension, i.e. for the case where for each data point, the model produces n output dimensions, the mean loss for the i-th dimension is to be computed separately and be scaled with the weight, and the overall loss returned is the weighted average. The weights need not sum to 1 (normalisation is applied). :param apply_output_dim_weights_in_validation: whether output dimension weights are also to be applied to to the metrics computed for validation. Note that this may not be possible if a validationTensorTransformer which changes the output dimensions is used. :param validation_metric_name: the metric to use for model selection during validation; if None, use default depending on lossFn """ self.validation_tensor_transformer = validation_tensor_transformer self.output_dim_weights = np.array(output_dim_weights) if output_dim_weights is not None else None self.apply_output_dim_weights_in_validation = apply_output_dim_weights_in_validation self.validation_metric_name = validation_metric_name if loss_fn is None: loss_fn = self.LossFunction.L2LOSS try: self.loss_fn = self.LossFunction(loss_fn) except ValueError: raise Exception(f"The loss function '{loss_fn}' is not supported. " f"Available options are: {[e.value for e in self.LossFunction]}")
[docs] def create_validation_loss_evaluator(self, cuda): return self.ValidationLossEvaluator(cuda, self.validation_tensor_transformer, self.output_dim_weights, self.apply_output_dim_weights_in_validation)
[docs] def get_training_criterion(self): if self.loss_fn is self.LossFunction.L1LOSS: criterion = nn.L1Loss(reduction='sum') elif self.loss_fn is self.LossFunction.L2LOSS or self.loss_fn == self.LossFunction.MSELOSS: criterion = nn.MSELoss(reduction='sum') elif self.loss_fn is self.LossFunction.SMOOTHL1LOSS: criterion = nn.SmoothL1Loss(reduction='sum') else: raise AssertionError(f"Loss function {self.loss_fn} defined but instantiation not implemented.") return criterion
[docs] def get_output_dim_weights(self) -> Optional[np.ndarray]: return self.output_dim_weights
[docs] class ValidationLossEvaluator(NNLossEvaluatorFixedDim.ValidationLossEvaluator): def __init__(self, cuda: bool, validation_tensor_transformer: Optional[TensorTransformer], output_dim_weights: np.ndarray, apply_output_dim_weights: bool): self.validationTensorTransformer = validation_tensor_transformer self.outputDimWeights = output_dim_weights self.applyOutputDimWeights = apply_output_dim_weights self.total_loss_l1 = None self.total_loss_l2 = None self.output_dims = None self.allTrueOutputs = None self.evaluate_l1 = nn.L1Loss(reduction='sum') self.evaluate_l2 = nn.MSELoss(reduction='sum') if cuda: self.evaluate_l1 = self.evaluate_l1.cuda() self.evaluate_l2 = self.evaluate_l2.cuda() self.begin_new_validation_collection: Optional[bool] = None
[docs] def start_validation_collection(self, ground_truth_shape): if len(ground_truth_shape) != 1: raise ValueError("Outputs that are not vectors are currently unsupported") self.begin_new_validation_collection = True
[docs] def process_validation_result_batch(self, output, ground_truth): # apply tensor transformer (if any) if self.validationTensorTransformer is not None: output = self.validationTensorTransformer.transform(output) ground_truth = self.validationTensorTransformer.transform(ground_truth) # check if new collection if self.begin_new_validation_collection: self.output_dims = ground_truth.shape[-1] self.total_loss_l1 = np.zeros(self.output_dims) self.total_loss_l2 = np.zeros(self.output_dims) self.allTrueOutputs = None self.begin_new_validation_collection = False assert len(output.shape) == 2 and len(ground_truth.shape) == 2 # obtain series of outputs per output dimension: (batch_size, output_size) -> (output_size, batch_size) predicted_output = output.permute(1, 0) true_output = ground_truth.permute(1, 0) if self.allTrueOutputs is None: self.allTrueOutputs = true_output else: self.allTrueOutputs = torch.cat((self.allTrueOutputs, true_output), dim=1) for i in range(self.output_dims): self.total_loss_l1[i] += self.evaluate_l1(predicted_output[i], true_output[i]).item() self.total_loss_l2[i] += self.evaluate_l2(predicted_output[i], true_output[i]).item()
[docs] def end_validation_collection(self): output_dims = self.output_dims rae = np.zeros(output_dims) rrse = np.zeros(output_dims) mae = np.zeros(output_dims) mse = np.zeros(output_dims) for i in range(output_dims): mean = torch.mean(self.allTrueOutputs[i]) ref_model_errors = self.allTrueOutputs[i] - mean ref_model_sum_abs_errors = torch.sum(torch.abs(ref_model_errors)).item() ref_model_sum_squared_errors = torch.sum(ref_model_errors * ref_model_errors).item() num_samples = ref_model_errors.size(0) mae[i] = self.total_loss_l1[i] / num_samples mse[i] = self.total_loss_l2[i] / num_samples rae[i] = self.total_loss_l1[i] / ref_model_sum_abs_errors if ref_model_sum_abs_errors != 0 else np.inf rrse[i] = np.sqrt(mse[i]) / np.sqrt( ref_model_sum_squared_errors / num_samples) if ref_model_sum_squared_errors != 0 else np.inf def mean(x): if self.applyOutputDimWeights: return np.average(x, weights=self.outputDimWeights) else: return np.mean(x) metrics = OrderedDict([("RRSE", mean(rrse)), ("RAE", mean(rae)), ("MSE", mean(mse)), ("MAE", mean(mae))]) return metrics
[docs] def get_validation_metric_name(self): if self.validation_metric_name is not None: return self.validation_metric_name else: if self.loss_fn is self.LossFunction.L1LOSS or self.loss_fn is self.LossFunction.SMOOTHL1LOSS: return "MAE" elif self.loss_fn is self.LossFunction.L2LOSS or self.loss_fn is self.LossFunction.MSELOSS: return "MSE" else: raise AssertionError(f"No validation metric defined as selection criterion for loss function {self.loss_fn}")
[docs]class NNLossEvaluatorClassification(NNLossEvaluatorFixedDim): """A loss evaluator for classification"""
[docs] class LossFunction(Enum): CROSSENTROPY = "CrossEntropy" NLL = "NegativeLogLikelihood"
[docs] def create_criterion(self) -> Callable: if self is self.CROSSENTROPY: return nn.CrossEntropyLoss(reduction='sum') elif self is self.NLL: return nn.NLLLoss(reduction="sum")
[docs] def get_validation_metric_key(self) -> str: if self is self.CROSSENTROPY: return "CE" elif self is self.NLL: return "NLL"
[docs] @classmethod def default_for_output_mode(cls, output_mode: ClassificationOutputMode): if output_mode == ClassificationOutputMode.PROBABILITIES: raise ValueError(f"No loss function available for {output_mode}; Either apply log at the end and use " f"{ClassificationOutputMode.LOG_PROBABILITIES} or use a different final activation (e.g. log_softmax) " f"to avoid this type of output.") elif output_mode == ClassificationOutputMode.LOG_PROBABILITIES: return cls.NLL elif output_mode == ClassificationOutputMode.UNNORMALISED_LOG_PROBABILITIES: return cls.CROSSENTROPY else: raise ValueError(f"No default specified for {output_mode}")
def __init__(self, loss_fn: LossFunction): self.lossFn: "NNLossEvaluatorClassification.LossFunction" = self.LossFunction(loss_fn) def __str__(self): return f"{self.__class__.__name__}[{self.lossFn}]"
[docs] def create_validation_loss_evaluator(self, cuda): return self.ValidationLossEvaluator(cuda, self.lossFn)
[docs] def get_training_criterion(self): return self.lossFn.create_criterion()
[docs] def get_output_dim_weights(self) -> Optional[np.ndarray]: return None
[docs] class ValidationLossEvaluator(NNLossEvaluatorFixedDim.ValidationLossEvaluator): def __init__(self, cuda: bool, loss_fn: "NNLossEvaluatorClassification.LossFunction"): self.loss_fn = loss_fn self.total_loss = None self.num_validation_samples = None self.criterion = self.loss_fn.create_criterion() if cuda: self.criterion = self.criterion.cuda()
[docs] def start_validation_collection(self, ground_truth_shape): self.total_loss = 0 self.num_validation_samples = 0
[docs] def process_validation_result_batch(self, output, ground_truth): self.total_loss += self.criterion(output, ground_truth).item() self.num_validation_samples += output.shape[0]
[docs] def end_validation_collection(self): mean_loss = self.total_loss / self.num_validation_samples if isinstance(self.criterion, nn.CrossEntropyLoss): metrics = OrderedDict([("CE", mean_loss), ("GeoMeanProbTrueClass", math.exp(-mean_loss))]) elif isinstance(self.criterion, nn.NLLLoss): metrics = {"NLL": mean_loss} else: raise ValueError() return metrics
[docs] def get_validation_metric_name(self): return self.lossFn.get_validation_metric_key()
[docs]class NNOptimiserParams(ToStringMixin): REMOVED_PARAMS = {"cuda"} RENAMED_PARAMS = { "optimiserClip": "optimiser_clip", "lossEvaluator": "loss_evaluator", "optimiserLR": "optimiser_lr", "earlyStoppingEpochs": "early_stopping_epochs", "batchSize": "batch_size", "trainFraction": "train_fraction", "scaledOutputs": "scaled_outputs", "useShrinkage": "use_shrinkage", "shrinkageClip": "shrinkage_clip", } def __init__(self, loss_evaluator: NNLossEvaluator = None, gpu: Optional[int] = None, optimiser: Union[str, Optimiser] = "adam", optimiser_lr=0.001, early_stopping_epochs=None, batch_size=None, epochs=1000, train_fraction=0.75, scaled_outputs=False, use_shrinkage=True, shrinkage_clip=10., shuffle=True, optimiser_args: Optional[Dict[str, Any]] = None): """ :param loss_evaluator: the loss evaluator to use :param gpu: the index of the GPU to be used (if CUDA is enabled for the model to be trained); if None, default to first GPU :param optimiser: the optimiser to use :param optimiser_lr: the optimiser's learning rate :param early_stopping_epochs: the number of epochs without validation score improvement after which to abort training and use the best epoch's model (early stopping); if None, never abort training before all epochs are completed :param batch_size: the batch size to use; for algorithms L-BFGS (optimiser='lbfgs'), which do not use batches, leave this at None. If the algorithm uses batches and None is specified, batch size 64 will be used by default. :param train_fraction: the fraction of the data used for training (with the remainder being used for validation). If no validation is to be performed, pass 1.0. :param scaled_outputs: whether to scale all outputs, resulting in computations of the loss function based on scaled values rather than normalised values. Enabling scaling may not be appropriate in cases where there are multiple outputs on different scales/with completely different units. :param use_shrinkage: whether to apply shrinkage to gradients whose norm exceeds ``shrinkageClip``, scaling the gradient down to ``shrinkageClip`` :param shrinkage_clip: the maximum gradient norm beyond which to apply shrinkage (if ``useShrinkage`` is True) :param shuffle: whether to shuffle the training data :param optimiser_args: keyword arguments to be passed on to the actual torch optimiser """ if Optimiser.from_name_or_instance(optimiser) == Optimiser.LBFGS: large_batch_size = 1e12 if batch_size is not None: log.warning(f"LBFGS does not make use of batches, therefore using large batch size {large_batch_size} " f"to achieve use of a single batch") batch_size = large_batch_size else: if batch_size is None: log.debug("No batch size was specified, using batch size 64 by default") batch_size = 64 self.epochs = epochs self.batch_size = batch_size self.optimiser_lr = optimiser_lr self.shrinkage_clip = shrinkage_clip self.optimiser = optimiser self.gpu = gpu self.train_fraction = train_fraction self.scaled_outputs = scaled_outputs self.loss_evaluator = loss_evaluator self.optimiser_args = optimiser_args if optimiser_args is not None else {} self.use_shrinkage = use_shrinkage self.early_stopping_epochs = early_stopping_epochs self.shuffle = shuffle @classmethod def _updated_params(cls, params: dict) -> dict: return {cls.RENAMED_PARAMS.get(k, k): v for k, v in params.items() if k not in cls.REMOVED_PARAMS} def __setstate__(self, state): if "shuffle" not in state: state["shuffle"] = True self.__dict__ = self._updated_params(state)
[docs] @classmethod def from_dict_or_instance(cls, nn_optimiser_params: Union[dict, "NNOptimiserParams"]) -> "NNOptimiserParams": if isinstance(nn_optimiser_params, NNOptimiserParams): return nn_optimiser_params else: return cls.from_dict(nn_optimiser_params)
[docs] @classmethod def from_dict(cls, params: dict) -> "NNOptimiserParams": return NNOptimiserParams(**cls._updated_params(params))
# TODO remove deprecated dict interface
[docs] @classmethod def from_either_dict_or_instance(cls, nn_optimiser_dict_params: dict, nn_optimiser_params: Optional["NNOptimiserParams"]): have_instance = nn_optimiser_params is not None have_dict = len(nn_optimiser_dict_params) if have_instance and have_dict: raise ValueError("Received both a non-empty dictionary and an instance") if have_instance: return nn_optimiser_params else: return NNOptimiserParams.from_dict(nn_optimiser_dict_params)
[docs]class NNOptimiser: log = log.getChild(__qualname__) def __init__(self, params: NNOptimiserParams): """ :param params: parameters """ if params.loss_evaluator is None: raise ValueError("Must provide a loss evaluator") self.params = params self.cuda = None self.best_epoch = None def __str__(self): return f"{self.__class__.__name__}[params={self.params}]"
[docs] def fit(self, model: "TorchModel", data: Union[DataUtil, List[DataUtil], TorchDataSetProvider, List[TorchDataSetProvider], TorchDataSet, List[TorchDataSet], Tuple[TorchDataSet, TorchDataSet], List[Tuple[TorchDataSet, TorchDataSet]]], create_torch_module=True) -> "TrainingInfo": """ Fits the parameters of the given model to the given data, which can be a list of or single instance of one of the following: * a `DataUtil` or `TorchDataSetProvider` (from which a training set and validation set will be obtained according to the `trainFraction` parameter of this object) * a `TorchDataSet` which shall be used as the training set (for the case where no validation set shall be used) * a tuple with two `TorchDataSet` instances, where the first shall be used as the training set and the second as the validation set :param model: the model to be fitted :param data: the data to use (see variants above) :param create_torch_module: whether to newly create the torch module that is to be trained from the model's factory. If False, (re-)train the existing module. """ self.cuda = model.cuda self.log.info(f"Preparing parameter learning of {model} via {self} with cuda={self.cuda}") use_validation = self.params.train_fraction != 1.0 def to_data_set_provider(d) -> TorchDataSetProvider: if isinstance(d, TorchDataSetProvider): return d elif isinstance(d, DataUtil): return TorchDataSetProviderFromDataUtil(d, self.cuda) else: raise ValueError(f"Cannot create a TorchDataSetProvider from {d}") training_log_entries = [] def training_log(s): self.log.info(s) training_log_entries.append(s) self._init_cuda() # Set the random seed manually for reproducibility. seed = 42 torch.manual_seed(seed) if self.cuda: torchcuda.manual_seed_all(seed) torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True # obtain data, splitting it into training and validation set(s) validation_sets = [] training_sets = [] output_scalers = [] if type(data) != list: data = [data] self.log.info("Obtaining input/output training instances") for idx_data_item, data_item in enumerate(data): if isinstance(data_item, TorchDataSet): if use_validation: raise ValueError("Passing a TorchDataSet instance is not admissible when validation is enabled (trainFraction != 1.0). " "Pass a TorchDataSetProvider or another representation that supports validation instead.") training_set = data_item validation_set = None output_scaler = TensorScalerIdentity() elif type(data_item) == tuple: training_set, validation_set = data_item output_scaler = TensorScalerIdentity() else: data_set_provider = to_data_set_provider(data_item) training_set, validation_set = data_set_provider.provide_split(self.params.train_fraction) output_scaler = data_set_provider.get_output_tensor_scaler() training_sets.append(training_set) if validation_set is not None: validation_sets.append(validation_set) output_scalers.append(output_scaler) training_log(f"Data set {idx_data_item+1}/{len(data)}: #train={training_set.size()}, " f"#validation={validation_set.size() if validation_set is not None else 'None'}") training_log("Number of validation sets: %d" % len(validation_sets)) torch_model = model.create_torch_module() if create_torch_module else model.get_torch_module() if self.cuda: torch_model.cuda() model.set_torch_module(torch_model) n_params = sum([p.nelement() for p in torch_model.parameters()]) self.log.info(f"Learning parameters of {model}") training_log('Number of parameters: %d' % n_params) training_log(f"Starting training process via {self}") loss_evaluator = self.params.loss_evaluator total_epochs = None best_val = 1e9 best_epoch = 0 optim = _Optimiser(torch_model.parameters(), method=self.params.optimiser, lr=self.params.optimiser_lr, max_grad_norm=self.params.shrinkage_clip, use_shrinkage=self.params.use_shrinkage, **self.params.optimiser_args) best_model_bytes = model.get_module_bytes() loss_evaluation = loss_evaluator.start_evaluation(self.cuda) validation_metric_name = loss_evaluator.get_validation_metric_name() training_loss_values = [] validation_metric_values = [] try: self.log.info(f'Begin training with cuda={self.cuda}') self.log.info('Press Ctrl+C to end training early') for epoch in range(1, self.params.epochs + 1): loss_evaluation.start_epoch() epoch_start_time = time.time() # perform training step, processing all the training data once train_loss = self._train(training_sets, torch_model, optim, loss_evaluation, self.params.batch_size, output_scalers) training_loss_values.append(train_loss) # perform validation, computing the mean metrics across all validation sets (if more than one), # and check for new best result according to validation results is_new_best = False if use_validation: metrics_sum = None metrics_keys = None for i, (validation_set, output_scaler) in enumerate(zip(validation_sets, output_scalers)): metrics = self._evaluate(validation_set, torch_model, loss_evaluation, output_scaler) metrics_array = np.array(list(metrics.values())) if i == 0: metrics_sum = metrics_array metrics_keys = metrics.keys() else: metrics_sum += metrics_array metrics_sum /= len(validation_sets) # mean results metrics = dict(zip(metrics_keys, metrics_sum)) current_val = metrics[loss_evaluator.get_validation_metric_name()] validation_metric_values.append(current_val) is_new_best = current_val < best_val if is_new_best: best_val = current_val best_epoch = epoch best_str = "best {:s} {:5.6f} from this epoch".format(validation_metric_name, best_val) else: best_str = "best {:s} {:5.6f} from epoch {:d}".format(validation_metric_name, best_val, best_epoch) val_str = f' | validation {", ".join(["%s %5.4f" % e for e in metrics.items()])} | {best_str}' else: val_str = "" training_log( 'Epoch {:3d}/{} completed in {:5.2f}s | train loss {:5.4f}{:s}'.format( epoch, self.params.epochs, (time.time() - epoch_start_time), train_loss, val_str)) total_epochs = epoch if use_validation: if is_new_best: best_model_bytes = model.get_module_bytes() # check for early stopping num_epochs_without_improvement = epoch - best_epoch if self.params.early_stopping_epochs is not None and \ num_epochs_without_improvement >= self.params.early_stopping_epochs: training_log(f"Stopping early: {num_epochs_without_improvement} epochs without validation metric improvement") break training_log("Training complete") except KeyboardInterrupt: training_log('Exiting from training early because of keyboard interrupt') # reload best model according to validation results if use_validation: training_log(f'Best model is from epoch {best_epoch} with {validation_metric_name} {best_val} on validation set') self.best_epoch = best_epoch model.set_module_bytes(best_model_bytes) return TrainingInfo(best_epoch=best_epoch if use_validation else None, log=training_log_entries, total_epochs=total_epochs, training_loss_sequence=training_loss_values, validation_metric_sequence=validation_metric_values)
def _apply_model(self, model, input: Union[torch.Tensor, Sequence[torch.Tensor]], ground_truth, output_scaler: TensorScaler): if isinstance(input, torch.Tensor): output = model(input) else: output = model(*input) if self.params.scaled_outputs: output, ground_truth = self._scaled_values(output, ground_truth, output_scaler) return output, ground_truth @classmethod def _scaled_values(cls, model_output, ground_truth, output_scaler): scaled_output = output_scaler.denormalise(model_output) scaled_truth = output_scaler.denormalise(ground_truth) return scaled_output, scaled_truth def _train(self, data_sets: Sequence[TorchDataSet], model: nn.Module, optim: _Optimiser, loss_evaluation: NNLossEvaluator.Evaluation, batch_size: int, output_scalers: Sequence[TensorScaler]): """Performs one training epoch""" model.train() for data_set, output_scaler in zip(data_sets, output_scalers): for X, Y in data_set.iter_batches(batch_size, shuffle=self.params.shuffle): def closure(): model.zero_grad() output, ground_truth = self._apply_model(model, X, Y, output_scaler) loss = loss_evaluation.compute_train_batch_loss(output, ground_truth, X, Y) loss.backward() return loss optim.step(closure) return loss_evaluation.get_epoch_train_loss() def _evaluate(self, data_set: TorchDataSet, model: nn.Module, loss_evaluation: NNLossEvaluator.Evaluation, output_scaler: TensorScaler): """Evaluates the model on the given data set (a validation set)""" model.eval() for X, Y in data_set.iter_batches(self.params.batch_size, shuffle=False): with torch.no_grad(): output, ground_truth = self._apply_model(model, X, Y, output_scaler) loss_evaluation.process_validation_batch(output, ground_truth, X, Y) return loss_evaluation.get_validation_metrics() def _init_cuda(self): """Initialises CUDA (for learning) by setting the appropriate device if necessary""" if self.cuda: device_count = torchcuda.device_count() if device_count == 0: raise Exception("CUDA is enabled but no device found") if self.params.gpu is None: if device_count > 1: log.warning("More than one GPU detected but no GPU index was specified, using GPU 0 by default.") gpu_index = 0 else: gpu_index = self.params.gpu torchcuda.set_device(gpu_index) elif torchcuda.is_available(): self.log.info("NOTE: You have a CUDA device; consider running with cuda=True")
[docs]class TrainingInfo: def __init__(self, best_epoch: int = None, log: List[str] = None, training_loss_sequence: Sequence[float] = None, validation_metric_sequence: Sequence[float] = None, total_epochs=None): self.validation_metric_sequence = validation_metric_sequence self.training_loss_sequence = training_loss_sequence self.log = log self.best_epoch = best_epoch self.total_epochs = total_epochs def __setstate__(self, state): if "totalEpochs" not in state: state["totalEpochs"] = None self.__dict__ = state
[docs] def get_training_loss_series(self) -> pd.Series: return self._create_series_with_one_based_index(self.training_loss_sequence, name="training loss")
[docs] def get_validation_metric_series(self) -> pd.Series: return self._create_series_with_one_based_index(self.validation_metric_sequence, name="validation metric")
def _create_series_with_one_based_index(self, sequence: Sequence, name: str): series = pd.Series(sequence, name=name) series.index += 1 return series
[docs] def plot_all(self) -> matplotlib.figure.Figure: """ Plots both the sequence of training loss values and the sequence of validation metric values """ ts = self.get_training_loss_series() vs = self.get_validation_metric_series() fig, primary_ax = plt.subplots(1, 1) secondary_ax = primary_ax.twinx() training_line = primary_ax.plot(ts, color='blue') validation_line = secondary_ax.plot(vs, color='orange') best_epoc_line = primary_ax.axvline(self.best_epoch, color='black', linestyle='dashed') primary_ax.set_xlabel("epoch") primary_ax.set_ylabel(ts.name) secondary_ax.set_ylabel(vs.name) primary_ax.legend(training_line + validation_line + [best_epoc_line], [ts.name, vs.name, "best epoch"]) plt.tight_layout() return fig