Coverage for src/sensai/normalisation.py: 72%

54 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-29 18:29 +0000

1from enum import Enum 

2from typing import Union 

3 

4import numpy as np 

5import pandas as pd 

6import sklearn.preprocessing 

7 

8from .util.dtype import to_float_array 

9 

10 

11class NormalisationMode(Enum): 

12 NONE = "none" 

13 MAX_ALL = "max_all" 

14 MAX_BY_COLUMN = "max_by_column" 

15 STANDARDISED = "standardised" 

16 

17 

18class VectorDataScaler: 

19 def __init__(self, data_frame: pd.DataFrame, normalisation_mode: NormalisationMode): 

20 self.normalisation_mode = normalisation_mode 

21 self.scale, self.translate = self._compute_scaling_params(data_frame.values, normalisation_mode) 

22 self.dimension_names = list(data_frame.columns) 

23 

24 @classmethod 

25 def _compute_scaling_params(cls, raw_array: np.ndarray, normalisation_mode: NormalisationMode): 

26 """ 

27 :param raw_array: numpy array containing raw data 

28 :param normalisation_mode: the normalization mode (0=none, 1=by maximum in entire data set, 2=by separate maximum in each column) 

29 """ 

30 translate = None 

31 scale = None 

32 if normalisation_mode != NormalisationMode.NONE: 

33 if len(raw_array.shape) != 2: 

34 raise ValueError(f"Only 2D arrays are supported by {cls.__name__} with mode {normalisation_mode}") 

35 dim = raw_array.shape[1] 

36 if normalisation_mode == NormalisationMode.MAX_ALL: 

37 scale = np.ones(dim) * np.max(np.abs(raw_array)) 

38 elif normalisation_mode == NormalisationMode.MAX_BY_COLUMN: 

39 scale = np.ones(dim) 

40 for i in range(dim): 

41 scale[i] = np.max(np.abs(raw_array[:, i])) 

42 elif normalisation_mode == NormalisationMode.STANDARDISED: 

43 standardScaler = sklearn.preprocessing.StandardScaler() 

44 standardScaler.fit(raw_array) 

45 translate = standardScaler.mean_ 

46 scale = standardScaler.scale_ 

47 else: 

48 raise Exception("Unknown normalization mode") 

49 return scale, translate 

50 

51 @staticmethod 

52 def _array(data: Union[pd.DataFrame, np.ndarray]): 

53 return to_float_array(data) 

54 

55 def get_normalised_array(self, data: Union[pd.DataFrame, np.ndarray]) -> np.ndarray: 

56 result = self._array(data) 

57 if self.translate is not None: 

58 result = result - self.translate 

59 if self.scale is not None: 

60 result = result / self.scale 

61 return result 

62 

63 def get_denormalised_array(self, data: Union[pd.DataFrame, np.ndarray]) -> np.ndarray: 

64 result = self._array(data) 

65 if self.scale is not None: 

66 result = result * self.scale 

67 if self.translate is not None: 

68 result = result + self.translate 

69 return result