Coverage for src/sensai/columngen.py: 33%

78 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-29 18:29 +0000

1from abc import ABC, abstractmethod 

2import logging 

3from typing import Any, Union, Optional 

4 

5import numpy as np 

6import pandas as pd 

7 

8from .data_transformation import DFTNormalisation 

9from .featuregen import FeatureGeneratorFromColumnGenerator 

10from .util.cache import KeyValueCache 

11 

12 

13log = logging.getLogger(__name__) 

14 

15 

16class ColumnGenerator: 

17 """ 

18 Generates a single column (pd.Series) from an input data frame, which is to have the same index as the input 

19 """ 

20 def __init__(self, generated_column_name: str): 

21 """ 

22 :param generated_column_name: the name of the column being generated 

23 """ 

24 self.generatedColumnName = generated_column_name 

25 

26 def generate_column(self, df: pd.DataFrame) -> pd.Series: 

27 """ 

28 Generates a column from the input data frame 

29 

30 :param df: the input data frame 

31 :return: the column as a named series, which has the same index as the input 

32 """ 

33 result = self._generate_column(df) 

34 if isinstance(result, pd.Series): 

35 result.name = self.generatedColumnName 

36 else: 

37 result = pd.Series(result, index=df.index, name=self.generatedColumnName) 

38 return result 

39 

40 @abstractmethod 

41 def _generate_column(self, df: pd.DataFrame) -> Union[pd.Series, list, np.ndarray]: 

42 """ 

43 Performs the actual column generation 

44 

45 :param df: the input data frame 

46 :return: a list/array of the same length as df or a series with the same index 

47 """ 

48 pass 

49 

50 def to_feature_generator(self, 

51 take_input_column_if_present: bool = False, 

52 normalisation_rule_template: DFTNormalisation.RuleTemplate = None, 

53 is_categorical: bool = False): 

54 """ 

55 Transforms this column generator into a feature generator that can be used as part of a VectorModel. 

56 

57 :param take_input_column_if_present: if True, then if a column whose name corresponds to the column to generate exists 

58 in the input data, simply copy it to generate the output (without using the column generator); if False, always 

59 apply the columnGen to generate the output 

60 :param is_categorical: whether the resulting column is categorical 

61 :param normalisation_rule_template: template for a DFTNormalisation for the resulting column. 

62 This should only be provided if is_categorical is False 

63 :return: 

64 """ 

65 return FeatureGeneratorFromColumnGenerator(self, 

66 take_input_column_if_present=take_input_column_if_present, 

67 normalisation_rule_template=normalisation_rule_template, 

68 is_categorical=is_categorical) 

69 

70 

71class IndexCachedColumnGenerator(ColumnGenerator): 

72 """ 

73 Decorator for a column generator which adds support for cached column generation where cache keys are given by the input data frame's 

74 index. Entries not found in the cache are computed by the wrapped column generator. 

75 

76 The main use case for this class is to add caching to existing ColumnGenerators. For creating a new caching 

77 ColumnGenerator the use of ColumnGeneratorCachedByIndex is encouraged. 

78 """ 

79 

80 log = log.getChild(__qualname__) 

81 

82 def __init__(self, column_generator: ColumnGenerator, cache: KeyValueCache): 

83 """ 

84 :param column_generator: the column generator with which to generate values for keys not found in the cache 

85 :param cache: the cache in which to store key-value pairs 

86 """ 

87 super().__init__(column_generator.generatedColumnName) 

88 self.columnGenerator = column_generator 

89 self.cache = cache 

90 

91 def _generate_column(self, df: pd.DataFrame) -> pd.Series: 

92 # compute series of cached values 

93 cache_values = [self.cache.get(nt.Index) for nt in df.itertuples()] 

94 cache_series = pd.Series(cache_values, dtype=object, index=df.index).dropna() 

95 

96 # compute missing values (if any) via wrapped generator, storing them in the cache 

97 missing_values_df = df[~df.index.isin(cache_series.index)] 

98 self.log.info(f"Retrieved {len(cache_series)} values from the cache, {len(missing_values_df)} still to be computed by " 

99 f"{self.columnGenerator}") 

100 if len(missing_values_df) == 0: 

101 return cache_series 

102 else: 

103 missing_series = self.columnGenerator.generate_column(missing_values_df) 

104 for key, value in missing_series.iteritems(): 

105 self.cache.set(key, value) 

106 return pd.concat((cache_series, missing_series)) 

107 

108 

109class ColumnGeneratorCachedByIndex(ColumnGenerator, ABC): 

110 """ 

111 Base class for column generators, which supports cached column generation, each value being generated independently. 

112 Cache keys are given by the input data frame's index. 

113 """ 

114 

115 log = log.getChild(__qualname__) 

116 

117 def __init__(self, generated_column_name: str, cache: Optional[KeyValueCache], persist_cache=False): 

118 """ 

119 :param generated_column_name: the name of the column being generated 

120 :param cache: the cache in which to store key-value pairs. If None, caching will be disabled 

121 :param persist_cache: whether to persist the cache when pickling 

122 """ 

123 super().__init__(generated_column_name) 

124 self.cache = cache 

125 self.persistCache = persist_cache 

126 

127 def _generate_column(self, df: pd.DataFrame) -> Union[pd.Series, list, np.ndarray]: 

128 self.log.info(f"Generating column {self.generatedColumnName} with {self.__class__.__name__}") 

129 values = [] 

130 cache_hits = 0 

131 column_length = len(df) 

132 percentage_to_log = 0 

133 for i, namedTuple in enumerate(df.itertuples()): 

134 percentage_generated = int(100*i/column_length) 

135 if percentage_generated == percentage_to_log: 

136 self.log.debug(f"Processed {percentage_to_log}% of {self.generatedColumnName}") 

137 percentage_to_log += 5 

138 

139 key = namedTuple.Index 

140 if self.cache is not None: 

141 value = self.cache.get(key) 

142 if value is None: 

143 value = self._generate_value(namedTuple) 

144 self.cache.set(key, value) 

145 else: 

146 cache_hits += 1 

147 else: 

148 value = self._generate_value(namedTuple) 

149 values.append(value) 

150 if self.cache is not None: 

151 self.log.info(f"Cached column generation resulted in {cache_hits}/{column_length} cache hits") 

152 return values 

153 

154 def __getstate__(self): 

155 if not self.persistCache: 

156 d = self.__dict__.copy() 

157 d["cache"] = None 

158 return d 

159 return self.__dict__ 

160 

161 @abstractmethod 

162 def _generate_value(self, named_tuple) -> Any: 

163 pass