Coverage for src/sensai/nearest_neighbors.py: 29%

249 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import collections 

2import datetime 

3import logging 

4import typing 

5from abc import ABC, abstractmethod 

6from typing import Callable, List, Iterable, Optional 

7 

8import numpy as np 

9import pandas as pd 

10 

11from . import util, data_transformation 

12from .distance_metric import DistanceMetric 

13from .featuregen import FeatureGeneratorFromNamedTuples 

14from .util.string import object_repr 

15from .util.typing import PandasNamedTuple 

16from .vector_model import VectorClassificationModel, VectorRegressionModel 

17 

18log = logging.getLogger(__name__) 

19 

20 

21class Neighbor: 

22 def __init__(self, value: PandasNamedTuple, distance: float): 

23 self.distance = distance 

24 self.value = value 

25 self.identifier = value.Index 

26 

27 

28class NeighborProvider(ABC): 

29 def __init__(self, df_indexed_by_id: pd.DataFrame): 

30 self.df = df_indexed_by_id 

31 self.index = self.df.index 

32 if any(self.index.duplicated()): 

33 raise Exception("Dataframe index should not contain duplicates") 

34 self.index_position_dict = {idx: pos for pos, idx in enumerate(self.index)} 

35 

36 @abstractmethod 

37 def iter_potential_neighbors(self, value: PandasNamedTuple) -> Iterable[PandasNamedTuple]: 

38 pass 

39 

40 @abstractmethod 

41 def __str__(self): 

42 return super().__str__() 

43 

44 

45class AllNeighborsProvider(NeighborProvider): 

46 def __init__(self, df_indexed_by_id: pd.DataFrame): 

47 super().__init__(df_indexed_by_id) 

48 self.named_tuples = None 

49 

50 def __getstate__(self): 

51 d = self.__dict__.copy() 

52 d["namedTuples"] = None 

53 return d 

54 

55 def iter_potential_neighbors(self, value): 

56 identifier = value.Index 

57 if self.named_tuples is None: 

58 self.named_tuples = list(self.df.itertuples()) 

59 for nt in self.named_tuples: 

60 if nt.Index != identifier: 

61 yield nt 

62 

63 def __str__(self): 

64 return str(self.__class__.__name__) 

65 

66 

67class TimerangeNeighborsProvider(NeighborProvider): 

68 def __init__(self, df_indexed_by_id: pd.DataFrame, timestamps_column="timestamps", 

69 past_time_range_days=120, future_time_range_days=120): 

70 super().__init__(df_indexed_by_id) 

71 if not pd.core.dtypes.common.is_datetime64_any_dtype(self.df[timestamps_column]): 

72 raise Exception(f"Column {timestamps_column} does not have a compatible datatype") 

73 self.timestamps_column = timestamps_column 

74 self.past_time_range_days = past_time_range_days 

75 self.future_time_range_days = future_time_range_days 

76 self.past_timedelta = datetime.timedelta(days=past_time_range_days) 

77 self.future_timedelta = datetime.timedelta(days=future_time_range_days) 

78 

79 def iter_potential_neighbors(self, value: PandasNamedTuple): 

80 identifier = value.Index 

81 input_time = getattr(value, self.timestamps_column) 

82 max_time, min_time = input_time + self.future_timedelta, input_time - self.past_timedelta 

83 neighbors_df = self.df[ 

84 self.df[self.timestamps_column].apply(lambda time: min_time < time < input_time) 

85 ] 

86 if identifier in neighbors_df.index: 

87 neighbors_df.drop(identifier, inplace=True) 

88 return neighbors_df.itertuples() 

89 

90 def __str__(self): 

91 return object_repr(self, ["past_time_range_days", "future_time_range_days"]) 

92 

93 

94class AbstractKnnFinder(ABC): 

95 @abstractmethod 

96 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]: 

97 pass 

98 

99 @abstractmethod 

100 def __str__(self): 

101 super().__str__() 

102 

103 

104class CachingKNearestNeighboursFinder(AbstractKnnFinder): 

105 """ 

106 A nearest neighbor finder which uses a cache for distance metrics in order speed up repeated computations 

107 of the neighbors of the same data point by keeping a pandas.Series of distances to all provided 

108 data points cached. If the distance metric is of the composite type LinearCombinationDistanceMetric, 

109 its component distance metrics are cached, such that weights in the linear combination can be varied 

110 without necessitating recomputations. 

111 """ 

112 log = log.getChild(__qualname__) 

113 

114 def __init__(self, cache: 'CachingKNearestNeighboursFinder.DistanceMetricCache', distance_metric: DistanceMetric, 

115 neighbor_provider: NeighborProvider): 

116 self.neighbor_provider = neighbor_provider 

117 # This field is purely for logging purposes 

118 self.distance_metric = distance_metric 

119 if isinstance(distance_metric, distance_metric.LinearCombinationDistanceMetric): 

120 self.weighted_distance_metrics = [(cache.get_cached_metric(dm), w) for (w, dm) in distance_metric.metrics] 

121 else: 

122 self.weighted_distance_metrics = [(cache.get_cached_metric(distance_metric), 1)] 

123 

124 def __str__(self): 

125 return object_repr(self, ["neighbor_provider", "distance_metric"]) 

126 

127 class DistanceMetricCache: 

128 """ 

129 A cache for distance metrics which identifies equivalent distance metrics by their string representations. 

130 The cache can be passed (consecutively) to multiple KNN models in order to speed up computations for the 

131 same test data points. If the cache is reused, it is assumed that the neighbor provider remains the same. 

132 """ 

133 log = log.getChild(__qualname__) 

134 

135 def __init__(self): 

136 self._cached_metrics_by_name = {} 

137 

138 def get_cached_metric(self, distance_metric): 

139 key = str(distance_metric) 

140 cached_metric = self._cached_metrics_by_name.get(key) 

141 if cached_metric is None: 

142 self.log.info(f"Creating new cached metric for key '{key}'") 

143 cached_metric = CachingKNearestNeighboursFinder.CachedSeriesDistanceMetric(distance_metric) 

144 self._cached_metrics_by_name[key] = cached_metric 

145 else: 

146 self.log.info(f"Reusing cached metric for key '{key}'") 

147 return cached_metric 

148 

149 class CachedSeriesDistanceMetric: 

150 """ 

151 Provides caching for a wrapped distance metric: the series of all distances to provided potential neighbors 

152 are retained in a cache 

153 """ 

154 def __init__(self, distance_metric): 

155 self.distance_metric = distance_metric 

156 self.cache = {} 

157 

158 def get_distance_series(self, named_tuple: PandasNamedTuple, potential_neighbor_values): 

159 identifier = named_tuple.Index 

160 series = self.cache.get(identifier) 

161 if series is None: 

162 distances = [] 

163 for neighborTuple in potential_neighbor_values: 

164 distances.append(self.distance_metric.distance(named_tuple, neighborTuple)) 

165 series = pd.Series(distances) 

166 self.cache[identifier] = series 

167 return series 

168 

169 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]: 

170 potential_neighbors = list(self.neighbor_provider.iter_potential_neighbors(named_tuple)) 

171 summed_distance_series = None 

172 for i, (metric, weight) in enumerate(self.weighted_distance_metrics): 

173 weighted_distances_series = metric.get_distance_series(named_tuple, potential_neighbors) * weight 

174 if i == 0: 

175 summed_distance_series = weighted_distances_series.copy() 

176 else: 

177 summed_distance_series += weighted_distances_series 

178 summed_distance_series.sort_values(ascending=True, inplace=True) 

179 result = [] 

180 for i in range(n_neighbors): 

181 neighbor_tuple = potential_neighbors[summed_distance_series.index[i]] 

182 distance = summed_distance_series.iloc[i] 

183 result.append(Neighbor(neighbor_tuple, distance)) 

184 return result 

185 

186 

187class KNearestNeighboursFinder(AbstractKnnFinder): 

188 def __init__(self, distance_metric: DistanceMetric, neighbor_provider: NeighborProvider): 

189 self.neighbor_provider = neighbor_provider 

190 self.distance_metric = distance_metric 

191 

192 def __str__(self): 

193 return object_repr(self, ["neighbor_provider", "distance_metric"]) 

194 

195 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]: 

196 result = [] 

197 log.debug(f"Finding neighbors for {named_tuple.Index}") 

198 for neighborTuple in self.neighbor_provider.iter_potential_neighbors(named_tuple): 

199 distance = self.distance_metric.distance(named_tuple, neighborTuple) 

200 result.append(Neighbor(neighborTuple, distance)) 

201 result.sort(key=lambda n: n.distance) 

202 return result[:n_neighbors] 

203 

204 

205class KNearestNeighboursClassificationModel(VectorClassificationModel): 

206 def __init__(self, num_neighbors: int, distance_metric: DistanceMetric, 

207 neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider, 

208 distance_based_weighting=False, distance_epsilon=1e-3, 

209 distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs): 

210 """ 

211 :param num_neighbors: the number of nearest neighbors to consider 

212 :param distance_metric: the distance metric to use 

213 :param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data 

214 :param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote 

215 :param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances); 

216 :param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations 

217 of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics); 

218 see class CachingKNearestNeighboursFinder 

219 :param kwargs: parameters to pass on to super-classes 

220 """ 

221 super().__init__(**kwargs) 

222 self.distance_epsilon = distance_epsilon 

223 self.distance_based_weighting = distance_based_weighting 

224 self.neighbor_provider_factory = neighbor_provider_factory 

225 self.num_neighbors = num_neighbors 

226 self.distance_metric = distance_metric 

227 self.distance_metric_cache = distance_metric_cache 

228 self.df = None 

229 self.y = None 

230 self.knn_finder = None 

231 

232 def _tostring_excludes(self) -> List[str]: 

233 return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"] 

234 

235 # noinspection DuplicatedCode 

236 def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame): 

237 assert len(y.columns) == 1, "Expected exactly one column in label set Y" 

238 self.df = x.merge(y, how="inner", left_index=True, right_index=True) 

239 self.y = y 

240 neighbor_provider = self.neighbor_provider_factory(self.df) 

241 if self.distance_metric_cache is None: 

242 self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider) 

243 else: 

244 self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider) 

245 log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}") 

246 

247 def _predict_class_probabilities(self, x: pd.DataFrame): 

248 output_df = pd.DataFrame({label: np.nan for label in self._labels}, index=x.index) 

249 for nt in x.itertuples(): 

250 neighbors = self.find_neighbors(nt) 

251 probabilities = self._predict_class_probability_vector_from_neighbors(neighbors) 

252 output_df.loc[nt.Index] = probabilities 

253 return output_df 

254 

255 def _predict_class_probability_vector_from_neighbors(self, neighbors: List['Neighbor']): 

256 weights = collections.defaultdict(lambda: 0) 

257 total = 0 

258 for neigh in neighbors: 

259 if self.distance_based_weighting: 

260 weight = 1.0 / (neigh.distance + self.distance_epsilon) 

261 else: 

262 weight = 1 

263 weights[self._get_label(neigh)] += weight 

264 total += weight 

265 return [weights[label] / total for label in self._labels] 

266 

267 def _get_label(self, neighbor: 'Neighbor'): 

268 return self.y.iloc[:, 0].loc[neighbor.identifier] 

269 

270 def find_neighbors(self, named_tuple): 

271 return self.knn_finder.find_neighbors(named_tuple, self.num_neighbors) 

272 

273 

274class KNearestNeighboursRegressionModel(VectorRegressionModel): 

275 def __init__(self, num_neighbors: int, distance_metric: DistanceMetric, 

276 neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider, 

277 distance_based_weighting=False, distance_epsilon=1e-3, 

278 distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs): 

279 """ 

280 :param num_neighbors: the number of nearest neighbors to consider 

281 :param distance_metric: the distance metric to use 

282 :param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data 

283 :param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote 

284 :param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances); 

285 :param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations 

286 of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics); 

287 see class CachingKNearestNeighboursFinder 

288 :param kwargs: parameters to pass on to super-classes 

289 """ 

290 super().__init__(**kwargs) 

291 self.distance_epsilon = distance_epsilon 

292 self.distance_based_weighting = distance_based_weighting 

293 self.neighbor_provider_factory = neighbor_provider_factory 

294 self.num_neighbors = num_neighbors 

295 self.distance_metric = distance_metric 

296 self.distance_metric_cache = distance_metric_cache 

297 self.df = None 

298 self.y = None 

299 self.knn_finder = None 

300 

301 def _tostring_excludes(self) -> List[str]: 

302 return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"] 

303 

304 # noinspection DuplicatedCode 

305 def _fit(self, x: pd.DataFrame, y: pd.DataFrame): 

306 assert len(y.columns) == 1, "Expected exactly one column in label set Y" 

307 self.df = x.merge(y, how="inner", left_index=True, right_index=True) 

308 self.y = y 

309 neighbor_provider = self.neighbor_provider_factory(self.df) 

310 if self.distance_metric_cache is None: 

311 self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider) 

312 else: 

313 self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider) 

314 log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}") 

315 

316 def _get_target(self, neighbor: Neighbor): 

317 return self.y.iloc[:, 0].loc[neighbor.identifier] 

318 

319 def _predict_single_input(self, named_tuple): 

320 neighbors = self.knn_finder.find_neighbors(named_tuple, self.num_neighbors) 

321 neighbor_targets = np.array([self._get_target(n) for n in neighbors]) 

322 if self.distance_based_weighting: 

323 neighbor_weights = np.array([1.0 / (n.distance + self.distance_epsilon) for n in neighbors]) 

324 return np.sum(neighbor_targets * neighbor_weights) / np.sum(neighbor_weights) 

325 else: 

326 return np.mean(neighbor_targets) 

327 

328 def _predict(self, x: pd.DataFrame) -> pd.DataFrame: 

329 predicted_values = [] 

330 for i, nt in enumerate(x.itertuples()): 

331 predicted_values.append(self._predict_single_input(nt)) 

332 return pd.DataFrame({self._predictedVariableNames[0]: predicted_values}, index=x.index) 

333 

334 

335class FeatureGeneratorNeighbors(FeatureGeneratorFromNamedTuples): 

336 """ 

337 Generates features based on nearest neighbors. For each neighbor, a set of features is added to the output data frame. 

338 Each feature has the name "n{0-based neighbor index}_{feature name}", where the feature names are configurable 

339 at construction. The feature name "distance", which indicates the distance of the neighbor to the data point is 

340 always present. 

341 """ 

342 def __init__(self, num_neighbors: int, 

343 neighbor_attributes: typing.List[str], 

344 distance_metric: DistanceMetric, 

345 neighbor_provider_factory: typing.Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider, 

346 cache: util.cache.KeyValueCache = None, 

347 categorical_feature_names: typing.Sequence[str] = (), 

348 normalisation_rules: typing.Sequence[data_transformation.DFTNormalisation.Rule] = ()): 

349 """ 

350 :param num_neighbors: the number of neighbors for to generate features 

351 :param neighbor_attributes: the attributes of the neighbor's named tuple to include as features (in addition to "distance") 

352 :param distance_metric: the distance metric defining which neighbors are near 

353 :param neighbor_provider_factory: a factory for the creation of neighbor provider 

354 :param cache: an optional key-value cache in which feature values are stored by data point identifier (as given by the DataFrame's 

355 index) 

356 """ 

357 super().__init__(cache=cache, categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules) 

358 self.neighbor_attributes = neighbor_attributes 

359 self.distance_metric = distance_metric 

360 self.neighbor_provider_factory = neighbor_provider_factory 

361 self.num_neighbors = num_neighbors 

362 self._knn_finder: Optional[KNearestNeighboursFinder] = None 

363 self._train_x = None 

364 

365 def _generate(self, df: pd.DataFrame, ctx=None): 

366 if self._train_x is None: 

367 raise Exception("Feature generator has not been fitted") 

368 neighbor_provider = self.neighbor_provider_factory(self._train_x) 

369 self._knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider) 

370 return super()._generate(df, ctx) 

371 

372 def _generate_feature_dict(self, named_tuple) -> typing.Dict[str, typing.Any]: 

373 neighbors = self._knn_finder.find_neighbors(named_tuple, self.num_neighbors) 

374 result = {} 

375 for i, neighbor in enumerate(neighbors): 

376 result[f"n{i}_distance"] = neighbor.distance 

377 for attr in self.neighbor_attributes: 

378 result[f"n{i}_{attr}"] = getattr(neighbor.value, attr) 

379 return result 

380 

381 def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None): 

382 self._train_x = x