Coverage for src/sensai/nearest_neighbors.py: 29%

251 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-29 18:29 +0000

1import collections 

2import datetime 

3import logging 

4import typing 

5from abc import ABC, abstractmethod 

6from typing import Callable, List, Iterable, Optional 

7 

8import numpy as np 

9import pandas as pd 

10 

11from . import util, data_transformation 

12from .distance_metric import DistanceMetric 

13from .featuregen import FeatureGeneratorFromNamedTuples 

14from .util.string import object_repr 

15from .util.typing import PandasNamedTuple 

16from .vector_model import VectorClassificationModel, VectorRegressionModel 

17 

18log = logging.getLogger(__name__) 

19 

20 

21class Neighbor: 

22 def __init__(self, value: PandasNamedTuple, distance: float): 

23 self.distance = distance 

24 self.value = value 

25 self.identifier = value.Index 

26 

27 

28class NeighborProvider(ABC): 

29 def __init__(self, df_indexed_by_id: pd.DataFrame): 

30 self.df = df_indexed_by_id 

31 self.index = self.df.index 

32 if any(self.index.duplicated()): 

33 raise Exception("Dataframe index should not contain duplicates") 

34 self.index_position_dict = {idx: pos for pos, idx in enumerate(self.index)} 

35 

36 @abstractmethod 

37 def iter_potential_neighbors(self, value: PandasNamedTuple) -> Iterable[PandasNamedTuple]: 

38 pass 

39 

40 @abstractmethod 

41 def __str__(self): 

42 return super().__str__() 

43 

44 

45class AllNeighborsProvider(NeighborProvider): 

46 def __init__(self, df_indexed_by_id: pd.DataFrame): 

47 super().__init__(df_indexed_by_id) 

48 self.named_tuples = None 

49 

50 def __getstate__(self): 

51 d = self.__dict__.copy() 

52 d["namedTuples"] = None 

53 return d 

54 

55 def iter_potential_neighbors(self, value): 

56 identifier = value.Index 

57 if self.named_tuples is None: 

58 self.named_tuples = list(self.df.itertuples()) 

59 for nt in self.named_tuples: 

60 if nt.Index != identifier: 

61 yield nt 

62 

63 def __str__(self): 

64 return str(self.__class__.__name__) 

65 

66 

67class TimerangeNeighborsProvider(NeighborProvider): 

68 def __init__(self, df_indexed_by_id: pd.DataFrame, timestamps_column="timestamps", 

69 past_time_range_days=120, future_time_range_days=120): 

70 super().__init__(df_indexed_by_id) 

71 if not pd.core.dtypes.common.is_datetime64_any_dtype(self.df[timestamps_column]): 

72 raise Exception(f"Column {timestamps_column} does not have a compatible datatype") 

73 self.timestamps_column = timestamps_column 

74 self.past_time_range_days = past_time_range_days 

75 self.future_time_range_days = future_time_range_days 

76 self.past_timedelta = datetime.timedelta(days=past_time_range_days) 

77 self.future_timedelta = datetime.timedelta(days=future_time_range_days) 

78 

79 def iter_potential_neighbors(self, value: PandasNamedTuple): 

80 identifier = value.Index 

81 input_time = getattr(value, self.timestamps_column) 

82 max_time, min_time = input_time + self.future_timedelta, input_time - self.past_timedelta 

83 neighbors_df = self.df[ 

84 self.df[self.timestamps_column].apply(lambda time: min_time < time < input_time) 

85 ] 

86 if identifier in neighbors_df.index: 

87 neighbors_df.drop(identifier, inplace=True) 

88 return neighbors_df.itertuples() 

89 

90 def __str__(self): 

91 return object_repr(self, ["past_time_range_days", "future_time_range_days"]) 

92 

93 

94class AbstractKnnFinder(ABC): 

95 @abstractmethod 

96 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]: 

97 pass 

98 

99 @abstractmethod 

100 def __str__(self): 

101 super().__str__() 

102 

103 

104class CachingKNearestNeighboursFinder(AbstractKnnFinder): 

105 """ 

106 A nearest neighbor finder which uses a cache for distance metrics in order speed up repeated computations 

107 of the neighbors of the same data point by keeping a pandas.Series of distances to all provided 

108 data points cached. If the distance metric is of the composite type LinearCombinationDistanceMetric, 

109 its component distance metrics are cached, such that weights in the linear combination can be varied 

110 without necessitating recomputations. 

111 """ 

112 log = log.getChild(__qualname__) 

113 

114 def __init__(self, cache: 'CachingKNearestNeighboursFinder.DistanceMetricCache', distance_metric: DistanceMetric, 

115 neighbor_provider: NeighborProvider): 

116 self.neighbor_provider = neighbor_provider 

117 # This field is purely for logging purposes 

118 self.distance_metric = distance_metric 

119 if isinstance(distance_metric, distance_metric.LinearCombinationDistanceMetric): 

120 self.weighted_distance_metrics = [(cache.get_cached_metric(dm), w) for (w, dm) in distance_metric.metrics] 

121 else: 

122 self.weighted_distance_metrics = [(cache.get_cached_metric(distance_metric), 1)] 

123 

124 def __str__(self): 

125 return object_repr(self, ["neighbor_provider", "distance_metric"]) 

126 

127 class DistanceMetricCache: 

128 """ 

129 A cache for distance metrics which identifies equivalent distance metrics by their string representations. 

130 The cache can be passed (consecutively) to multiple KNN models in order to speed up computations for the 

131 same test data points. If the cache is reused, it is assumed that the neighbor provider remains the same. 

132 """ 

133 log = log.getChild(__qualname__) 

134 

135 def __init__(self): 

136 self._cached_metrics_by_name = {} 

137 

138 def get_cached_metric(self, distance_metric): 

139 key = str(distance_metric) 

140 cached_metric = self._cached_metrics_by_name.get(key) 

141 if cached_metric is None: 

142 self.log.info(f"Creating new cached metric for key '{key}'") 

143 cached_metric = CachingKNearestNeighboursFinder.CachedSeriesDistanceMetric(distance_metric) 

144 self._cached_metrics_by_name[key] = cached_metric 

145 else: 

146 self.log.info(f"Reusing cached metric for key '{key}'") 

147 return cached_metric 

148 

149 class CachedSeriesDistanceMetric: 

150 """ 

151 Provides caching for a wrapped distance metric: the series of all distances to provided potential neighbors 

152 are retained in a cache 

153 """ 

154 def __init__(self, distance_metric): 

155 self.distance_metric = distance_metric 

156 self.cache = {} 

157 

158 def get_distance_series(self, named_tuple: PandasNamedTuple, potential_neighbor_values): 

159 identifier = named_tuple.Index 

160 series = self.cache.get(identifier) 

161 if series is None: 

162 distances = [] 

163 for neighborTuple in potential_neighbor_values: 

164 distances.append(self.distance_metric.distance(named_tuple, neighborTuple)) 

165 series = pd.Series(distances) 

166 self.cache[identifier] = series 

167 return series 

168 

169 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]: 

170 potential_neighbors = list(self.neighbor_provider.iter_potential_neighbors(named_tuple)) 

171 summed_distance_series = None 

172 for i, (metric, weight) in enumerate(self.weighted_distance_metrics): 

173 weighted_distances_series = metric.get_distance_series(named_tuple, potential_neighbors) * weight 

174 if i == 0: 

175 summed_distance_series = weighted_distances_series.copy() 

176 else: 

177 summed_distance_series += weighted_distances_series 

178 summed_distance_series.sort_values(ascending=True, inplace=True) 

179 result = [] 

180 for i in range(n_neighbors): 

181 neighbor_tuple = potential_neighbors[summed_distance_series.index[i]] 

182 distance = summed_distance_series.iloc[i] 

183 result.append(Neighbor(neighbor_tuple, distance)) 

184 return result 

185 

186 

187class KNearestNeighboursFinder(AbstractKnnFinder): 

188 def __init__(self, distance_metric: DistanceMetric, neighbor_provider: NeighborProvider): 

189 self.neighbor_provider = neighbor_provider 

190 self.distance_metric = distance_metric 

191 

192 def __str__(self): 

193 return object_repr(self, ["neighbor_provider", "distance_metric"]) 

194 

195 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]: 

196 result = [] 

197 log.debug(f"Finding neighbors for {named_tuple.Index}") 

198 for neighborTuple in self.neighbor_provider.iter_potential_neighbors(named_tuple): 

199 distance = self.distance_metric.distance(named_tuple, neighborTuple) 

200 result.append(Neighbor(neighborTuple, distance)) 

201 result.sort(key=lambda n: n.distance) 

202 return result[:n_neighbors] 

203 

204 

205class KNearestNeighboursClassificationModel(VectorClassificationModel): 

206 def __init__(self, num_neighbors: int, distance_metric: DistanceMetric, 

207 neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider, 

208 distance_based_weighting=False, distance_epsilon=1e-3, 

209 distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs): 

210 """ 

211 :param num_neighbors: the number of nearest neighbors to consider 

212 :param distance_metric: the distance metric to use 

213 :param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data 

214 :param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote 

215 :param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances); 

216 :param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations 

217 of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics); 

218 see class CachingKNearestNeighboursFinder 

219 :param kwargs: parameters to pass on to super-classes 

220 """ 

221 super().__init__(**kwargs) 

222 self.distance_epsilon = distance_epsilon 

223 self.distance_based_weighting = distance_based_weighting 

224 self.neighbor_provider_factory = neighbor_provider_factory 

225 self.num_neighbors = num_neighbors 

226 self.distance_metric = distance_metric 

227 self.distance_metric_cache = distance_metric_cache 

228 self.df = None 

229 self.y = None 

230 self.knn_finder = None 

231 

232 def _tostring_excludes(self) -> List[str]: 

233 return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"] 

234 

235 # noinspection DuplicatedCode 

236 def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): 

237 self._warn_sample_weights_unsupported(False, weights) 

238 assert len(y.columns) == 1, "Expected exactly one column in label set Y" 

239 self.df = x.merge(y, how="inner", left_index=True, right_index=True) 

240 self.y = y 

241 neighbor_provider = self.neighbor_provider_factory(self.df) 

242 if self.distance_metric_cache is None: 

243 self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider) 

244 else: 

245 self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider) 

246 log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}") 

247 

248 def _predict_class_probabilities(self, x: pd.DataFrame): 

249 output_df = pd.DataFrame({label: np.nan for label in self._labels}, index=x.index) 

250 for nt in x.itertuples(): 

251 neighbors = self.find_neighbors(nt) 

252 probabilities = self._predict_class_probability_vector_from_neighbors(neighbors) 

253 output_df.loc[nt.Index] = probabilities 

254 return output_df 

255 

256 def _predict_class_probability_vector_from_neighbors(self, neighbors: List['Neighbor']): 

257 weights = collections.defaultdict(lambda: 0) 

258 total = 0 

259 for neigh in neighbors: 

260 if self.distance_based_weighting: 

261 weight = 1.0 / (neigh.distance + self.distance_epsilon) 

262 else: 

263 weight = 1 

264 weights[self._get_label(neigh)] += weight 

265 total += weight 

266 return [weights[label] / total for label in self._labels] 

267 

268 def _get_label(self, neighbor: 'Neighbor'): 

269 return self.y.iloc[:, 0].loc[neighbor.identifier] 

270 

271 def find_neighbors(self, named_tuple): 

272 return self.knn_finder.find_neighbors(named_tuple, self.num_neighbors) 

273 

274 

275class KNearestNeighboursRegressionModel(VectorRegressionModel): 

276 def __init__(self, num_neighbors: int, distance_metric: DistanceMetric, 

277 neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider, 

278 distance_based_weighting=False, distance_epsilon=1e-3, 

279 distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs): 

280 """ 

281 :param num_neighbors: the number of nearest neighbors to consider 

282 :param distance_metric: the distance metric to use 

283 :param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data 

284 :param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote 

285 :param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances); 

286 :param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations 

287 of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics); 

288 see class CachingKNearestNeighboursFinder 

289 :param kwargs: parameters to pass on to super-classes 

290 """ 

291 super().__init__(**kwargs) 

292 self.distance_epsilon = distance_epsilon 

293 self.distance_based_weighting = distance_based_weighting 

294 self.neighbor_provider_factory = neighbor_provider_factory 

295 self.num_neighbors = num_neighbors 

296 self.distance_metric = distance_metric 

297 self.distance_metric_cache = distance_metric_cache 

298 self.df = None 

299 self.y = None 

300 self.knn_finder = None 

301 

302 def _tostring_excludes(self) -> List[str]: 

303 return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"] 

304 

305 # noinspection DuplicatedCode 

306 def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None): 

307 self._warn_sample_weights_unsupported(False, weights) 

308 assert len(y.columns) == 1, "Expected exactly one column in label set Y" 

309 self.df = x.merge(y, how="inner", left_index=True, right_index=True) 

310 self.y = y 

311 neighbor_provider = self.neighbor_provider_factory(self.df) 

312 if self.distance_metric_cache is None: 

313 self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider) 

314 else: 

315 self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider) 

316 log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}") 

317 

318 def _get_target(self, neighbor: Neighbor): 

319 return self.y.iloc[:, 0].loc[neighbor.identifier] 

320 

321 def _predict_single_input(self, named_tuple): 

322 neighbors = self.knn_finder.find_neighbors(named_tuple, self.num_neighbors) 

323 neighbor_targets = np.array([self._get_target(n) for n in neighbors]) 

324 if self.distance_based_weighting: 

325 neighbor_weights = np.array([1.0 / (n.distance + self.distance_epsilon) for n in neighbors]) 

326 return np.sum(neighbor_targets * neighbor_weights) / np.sum(neighbor_weights) 

327 else: 

328 return np.mean(neighbor_targets) 

329 

330 def _predict(self, x: pd.DataFrame) -> pd.DataFrame: 

331 predicted_values = [] 

332 for i, nt in enumerate(x.itertuples()): 

333 predicted_values.append(self._predict_single_input(nt)) 

334 return pd.DataFrame({self._predictedVariableNames[0]: predicted_values}, index=x.index) 

335 

336 

337class FeatureGeneratorNeighbors(FeatureGeneratorFromNamedTuples): 

338 """ 

339 Generates features based on nearest neighbors. For each neighbor, a set of features is added to the output data frame. 

340 Each feature has the name "n{0-based neighbor index}_{feature name}", where the feature names are configurable 

341 at construction. The feature name "distance", which indicates the distance of the neighbor to the data point is 

342 always present. 

343 """ 

344 def __init__(self, num_neighbors: int, 

345 neighbor_attributes: typing.List[str], 

346 distance_metric: DistanceMetric, 

347 neighbor_provider_factory: typing.Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider, 

348 cache: util.cache.KeyValueCache = None, 

349 categorical_feature_names: typing.Sequence[str] = (), 

350 normalisation_rules: typing.Sequence[data_transformation.DFTNormalisation.Rule] = ()): 

351 """ 

352 :param num_neighbors: the number of neighbors for to generate features 

353 :param neighbor_attributes: the attributes of the neighbor's named tuple to include as features (in addition to "distance") 

354 :param distance_metric: the distance metric defining which neighbors are near 

355 :param neighbor_provider_factory: a factory for the creation of neighbor provider 

356 :param cache: an optional key-value cache in which feature values are stored by data point identifier (as given by the DataFrame's 

357 index) 

358 """ 

359 super().__init__(cache=cache, categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules) 

360 self.neighbor_attributes = neighbor_attributes 

361 self.distance_metric = distance_metric 

362 self.neighbor_provider_factory = neighbor_provider_factory 

363 self.num_neighbors = num_neighbors 

364 self._knn_finder: Optional[KNearestNeighboursFinder] = None 

365 self._train_x = None 

366 

367 def _generate(self, df: pd.DataFrame, ctx=None): 

368 if self._train_x is None: 

369 raise Exception("Feature generator has not been fitted") 

370 neighbor_provider = self.neighbor_provider_factory(self._train_x) 

371 self._knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider) 

372 return super()._generate(df, ctx) 

373 

374 def _generate_feature_dict(self, named_tuple) -> typing.Dict[str, typing.Any]: 

375 neighbors = self._knn_finder.find_neighbors(named_tuple, self.num_neighbors) 

376 result = {} 

377 for i, neighbor in enumerate(neighbors): 

378 result[f"n{i}_distance"] = neighbor.distance 

379 for attr in self.neighbor_attributes: 

380 result[f"n{i}_{attr}"] = getattr(neighbor.value, attr) 

381 return result 

382 

383 def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None): 

384 self._train_x = x