Coverage for src/sensai/nearest_neighbors.py: 29%
251 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
1import collections
2import datetime
3import logging
4import typing
5from abc import ABC, abstractmethod
6from typing import Callable, List, Iterable, Optional
8import numpy as np
9import pandas as pd
11from . import util, data_transformation
12from .distance_metric import DistanceMetric
13from .featuregen import FeatureGeneratorFromNamedTuples
14from .util.string import object_repr
15from .util.typing import PandasNamedTuple
16from .vector_model import VectorClassificationModel, VectorRegressionModel
18log = logging.getLogger(__name__)
21class Neighbor:
22 def __init__(self, value: PandasNamedTuple, distance: float):
23 self.distance = distance
24 self.value = value
25 self.identifier = value.Index
28class NeighborProvider(ABC):
29 def __init__(self, df_indexed_by_id: pd.DataFrame):
30 self.df = df_indexed_by_id
31 self.index = self.df.index
32 if any(self.index.duplicated()):
33 raise Exception("Dataframe index should not contain duplicates")
34 self.index_position_dict = {idx: pos for pos, idx in enumerate(self.index)}
36 @abstractmethod
37 def iter_potential_neighbors(self, value: PandasNamedTuple) -> Iterable[PandasNamedTuple]:
38 pass
40 @abstractmethod
41 def __str__(self):
42 return super().__str__()
45class AllNeighborsProvider(NeighborProvider):
46 def __init__(self, df_indexed_by_id: pd.DataFrame):
47 super().__init__(df_indexed_by_id)
48 self.named_tuples = None
50 def __getstate__(self):
51 d = self.__dict__.copy()
52 d["namedTuples"] = None
53 return d
55 def iter_potential_neighbors(self, value):
56 identifier = value.Index
57 if self.named_tuples is None:
58 self.named_tuples = list(self.df.itertuples())
59 for nt in self.named_tuples:
60 if nt.Index != identifier:
61 yield nt
63 def __str__(self):
64 return str(self.__class__.__name__)
67class TimerangeNeighborsProvider(NeighborProvider):
68 def __init__(self, df_indexed_by_id: pd.DataFrame, timestamps_column="timestamps",
69 past_time_range_days=120, future_time_range_days=120):
70 super().__init__(df_indexed_by_id)
71 if not pd.core.dtypes.common.is_datetime64_any_dtype(self.df[timestamps_column]):
72 raise Exception(f"Column {timestamps_column} does not have a compatible datatype")
73 self.timestamps_column = timestamps_column
74 self.past_time_range_days = past_time_range_days
75 self.future_time_range_days = future_time_range_days
76 self.past_timedelta = datetime.timedelta(days=past_time_range_days)
77 self.future_timedelta = datetime.timedelta(days=future_time_range_days)
79 def iter_potential_neighbors(self, value: PandasNamedTuple):
80 identifier = value.Index
81 input_time = getattr(value, self.timestamps_column)
82 max_time, min_time = input_time + self.future_timedelta, input_time - self.past_timedelta
83 neighbors_df = self.df[
84 self.df[self.timestamps_column].apply(lambda time: min_time < time < input_time)
85 ]
86 if identifier in neighbors_df.index:
87 neighbors_df.drop(identifier, inplace=True)
88 return neighbors_df.itertuples()
90 def __str__(self):
91 return object_repr(self, ["past_time_range_days", "future_time_range_days"])
94class AbstractKnnFinder(ABC):
95 @abstractmethod
96 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
97 pass
99 @abstractmethod
100 def __str__(self):
101 super().__str__()
104class CachingKNearestNeighboursFinder(AbstractKnnFinder):
105 """
106 A nearest neighbor finder which uses a cache for distance metrics in order speed up repeated computations
107 of the neighbors of the same data point by keeping a pandas.Series of distances to all provided
108 data points cached. If the distance metric is of the composite type LinearCombinationDistanceMetric,
109 its component distance metrics are cached, such that weights in the linear combination can be varied
110 without necessitating recomputations.
111 """
112 log = log.getChild(__qualname__)
114 def __init__(self, cache: 'CachingKNearestNeighboursFinder.DistanceMetricCache', distance_metric: DistanceMetric,
115 neighbor_provider: NeighborProvider):
116 self.neighbor_provider = neighbor_provider
117 # This field is purely for logging purposes
118 self.distance_metric = distance_metric
119 if isinstance(distance_metric, distance_metric.LinearCombinationDistanceMetric):
120 self.weighted_distance_metrics = [(cache.get_cached_metric(dm), w) for (w, dm) in distance_metric.metrics]
121 else:
122 self.weighted_distance_metrics = [(cache.get_cached_metric(distance_metric), 1)]
124 def __str__(self):
125 return object_repr(self, ["neighbor_provider", "distance_metric"])
127 class DistanceMetricCache:
128 """
129 A cache for distance metrics which identifies equivalent distance metrics by their string representations.
130 The cache can be passed (consecutively) to multiple KNN models in order to speed up computations for the
131 same test data points. If the cache is reused, it is assumed that the neighbor provider remains the same.
132 """
133 log = log.getChild(__qualname__)
135 def __init__(self):
136 self._cached_metrics_by_name = {}
138 def get_cached_metric(self, distance_metric):
139 key = str(distance_metric)
140 cached_metric = self._cached_metrics_by_name.get(key)
141 if cached_metric is None:
142 self.log.info(f"Creating new cached metric for key '{key}'")
143 cached_metric = CachingKNearestNeighboursFinder.CachedSeriesDistanceMetric(distance_metric)
144 self._cached_metrics_by_name[key] = cached_metric
145 else:
146 self.log.info(f"Reusing cached metric for key '{key}'")
147 return cached_metric
149 class CachedSeriesDistanceMetric:
150 """
151 Provides caching for a wrapped distance metric: the series of all distances to provided potential neighbors
152 are retained in a cache
153 """
154 def __init__(self, distance_metric):
155 self.distance_metric = distance_metric
156 self.cache = {}
158 def get_distance_series(self, named_tuple: PandasNamedTuple, potential_neighbor_values):
159 identifier = named_tuple.Index
160 series = self.cache.get(identifier)
161 if series is None:
162 distances = []
163 for neighborTuple in potential_neighbor_values:
164 distances.append(self.distance_metric.distance(named_tuple, neighborTuple))
165 series = pd.Series(distances)
166 self.cache[identifier] = series
167 return series
169 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
170 potential_neighbors = list(self.neighbor_provider.iter_potential_neighbors(named_tuple))
171 summed_distance_series = None
172 for i, (metric, weight) in enumerate(self.weighted_distance_metrics):
173 weighted_distances_series = metric.get_distance_series(named_tuple, potential_neighbors) * weight
174 if i == 0:
175 summed_distance_series = weighted_distances_series.copy()
176 else:
177 summed_distance_series += weighted_distances_series
178 summed_distance_series.sort_values(ascending=True, inplace=True)
179 result = []
180 for i in range(n_neighbors):
181 neighbor_tuple = potential_neighbors[summed_distance_series.index[i]]
182 distance = summed_distance_series.iloc[i]
183 result.append(Neighbor(neighbor_tuple, distance))
184 return result
187class KNearestNeighboursFinder(AbstractKnnFinder):
188 def __init__(self, distance_metric: DistanceMetric, neighbor_provider: NeighborProvider):
189 self.neighbor_provider = neighbor_provider
190 self.distance_metric = distance_metric
192 def __str__(self):
193 return object_repr(self, ["neighbor_provider", "distance_metric"])
195 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
196 result = []
197 log.debug(f"Finding neighbors for {named_tuple.Index}")
198 for neighborTuple in self.neighbor_provider.iter_potential_neighbors(named_tuple):
199 distance = self.distance_metric.distance(named_tuple, neighborTuple)
200 result.append(Neighbor(neighborTuple, distance))
201 result.sort(key=lambda n: n.distance)
202 return result[:n_neighbors]
205class KNearestNeighboursClassificationModel(VectorClassificationModel):
206 def __init__(self, num_neighbors: int, distance_metric: DistanceMetric,
207 neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
208 distance_based_weighting=False, distance_epsilon=1e-3,
209 distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs):
210 """
211 :param num_neighbors: the number of nearest neighbors to consider
212 :param distance_metric: the distance metric to use
213 :param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data
214 :param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote
215 :param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances);
216 :param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations
217 of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics);
218 see class CachingKNearestNeighboursFinder
219 :param kwargs: parameters to pass on to super-classes
220 """
221 super().__init__(**kwargs)
222 self.distance_epsilon = distance_epsilon
223 self.distance_based_weighting = distance_based_weighting
224 self.neighbor_provider_factory = neighbor_provider_factory
225 self.num_neighbors = num_neighbors
226 self.distance_metric = distance_metric
227 self.distance_metric_cache = distance_metric_cache
228 self.df = None
229 self.y = None
230 self.knn_finder = None
232 def _tostring_excludes(self) -> List[str]:
233 return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"]
235 # noinspection DuplicatedCode
236 def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None):
237 self._warn_sample_weights_unsupported(False, weights)
238 assert len(y.columns) == 1, "Expected exactly one column in label set Y"
239 self.df = x.merge(y, how="inner", left_index=True, right_index=True)
240 self.y = y
241 neighbor_provider = self.neighbor_provider_factory(self.df)
242 if self.distance_metric_cache is None:
243 self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
244 else:
245 self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider)
246 log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}")
248 def _predict_class_probabilities(self, x: pd.DataFrame):
249 output_df = pd.DataFrame({label: np.nan for label in self._labels}, index=x.index)
250 for nt in x.itertuples():
251 neighbors = self.find_neighbors(nt)
252 probabilities = self._predict_class_probability_vector_from_neighbors(neighbors)
253 output_df.loc[nt.Index] = probabilities
254 return output_df
256 def _predict_class_probability_vector_from_neighbors(self, neighbors: List['Neighbor']):
257 weights = collections.defaultdict(lambda: 0)
258 total = 0
259 for neigh in neighbors:
260 if self.distance_based_weighting:
261 weight = 1.0 / (neigh.distance + self.distance_epsilon)
262 else:
263 weight = 1
264 weights[self._get_label(neigh)] += weight
265 total += weight
266 return [weights[label] / total for label in self._labels]
268 def _get_label(self, neighbor: 'Neighbor'):
269 return self.y.iloc[:, 0].loc[neighbor.identifier]
271 def find_neighbors(self, named_tuple):
272 return self.knn_finder.find_neighbors(named_tuple, self.num_neighbors)
275class KNearestNeighboursRegressionModel(VectorRegressionModel):
276 def __init__(self, num_neighbors: int, distance_metric: DistanceMetric,
277 neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
278 distance_based_weighting=False, distance_epsilon=1e-3,
279 distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs):
280 """
281 :param num_neighbors: the number of nearest neighbors to consider
282 :param distance_metric: the distance metric to use
283 :param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data
284 :param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote
285 :param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances);
286 :param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations
287 of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics);
288 see class CachingKNearestNeighboursFinder
289 :param kwargs: parameters to pass on to super-classes
290 """
291 super().__init__(**kwargs)
292 self.distance_epsilon = distance_epsilon
293 self.distance_based_weighting = distance_based_weighting
294 self.neighbor_provider_factory = neighbor_provider_factory
295 self.num_neighbors = num_neighbors
296 self.distance_metric = distance_metric
297 self.distance_metric_cache = distance_metric_cache
298 self.df = None
299 self.y = None
300 self.knn_finder = None
302 def _tostring_excludes(self) -> List[str]:
303 return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"]
305 # noinspection DuplicatedCode
306 def _fit(self, x: pd.DataFrame, y: pd.DataFrame, weights: Optional[pd.Series] = None):
307 self._warn_sample_weights_unsupported(False, weights)
308 assert len(y.columns) == 1, "Expected exactly one column in label set Y"
309 self.df = x.merge(y, how="inner", left_index=True, right_index=True)
310 self.y = y
311 neighbor_provider = self.neighbor_provider_factory(self.df)
312 if self.distance_metric_cache is None:
313 self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
314 else:
315 self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider)
316 log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}")
318 def _get_target(self, neighbor: Neighbor):
319 return self.y.iloc[:, 0].loc[neighbor.identifier]
321 def _predict_single_input(self, named_tuple):
322 neighbors = self.knn_finder.find_neighbors(named_tuple, self.num_neighbors)
323 neighbor_targets = np.array([self._get_target(n) for n in neighbors])
324 if self.distance_based_weighting:
325 neighbor_weights = np.array([1.0 / (n.distance + self.distance_epsilon) for n in neighbors])
326 return np.sum(neighbor_targets * neighbor_weights) / np.sum(neighbor_weights)
327 else:
328 return np.mean(neighbor_targets)
330 def _predict(self, x: pd.DataFrame) -> pd.DataFrame:
331 predicted_values = []
332 for i, nt in enumerate(x.itertuples()):
333 predicted_values.append(self._predict_single_input(nt))
334 return pd.DataFrame({self._predictedVariableNames[0]: predicted_values}, index=x.index)
337class FeatureGeneratorNeighbors(FeatureGeneratorFromNamedTuples):
338 """
339 Generates features based on nearest neighbors. For each neighbor, a set of features is added to the output data frame.
340 Each feature has the name "n{0-based neighbor index}_{feature name}", where the feature names are configurable
341 at construction. The feature name "distance", which indicates the distance of the neighbor to the data point is
342 always present.
343 """
344 def __init__(self, num_neighbors: int,
345 neighbor_attributes: typing.List[str],
346 distance_metric: DistanceMetric,
347 neighbor_provider_factory: typing.Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
348 cache: util.cache.KeyValueCache = None,
349 categorical_feature_names: typing.Sequence[str] = (),
350 normalisation_rules: typing.Sequence[data_transformation.DFTNormalisation.Rule] = ()):
351 """
352 :param num_neighbors: the number of neighbors for to generate features
353 :param neighbor_attributes: the attributes of the neighbor's named tuple to include as features (in addition to "distance")
354 :param distance_metric: the distance metric defining which neighbors are near
355 :param neighbor_provider_factory: a factory for the creation of neighbor provider
356 :param cache: an optional key-value cache in which feature values are stored by data point identifier (as given by the DataFrame's
357 index)
358 """
359 super().__init__(cache=cache, categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules)
360 self.neighbor_attributes = neighbor_attributes
361 self.distance_metric = distance_metric
362 self.neighbor_provider_factory = neighbor_provider_factory
363 self.num_neighbors = num_neighbors
364 self._knn_finder: Optional[KNearestNeighboursFinder] = None
365 self._train_x = None
367 def _generate(self, df: pd.DataFrame, ctx=None):
368 if self._train_x is None:
369 raise Exception("Feature generator has not been fitted")
370 neighbor_provider = self.neighbor_provider_factory(self._train_x)
371 self._knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
372 return super()._generate(df, ctx)
374 def _generate_feature_dict(self, named_tuple) -> typing.Dict[str, typing.Any]:
375 neighbors = self._knn_finder.find_neighbors(named_tuple, self.num_neighbors)
376 result = {}
377 for i, neighbor in enumerate(neighbors):
378 result[f"n{i}_distance"] = neighbor.distance
379 for attr in self.neighbor_attributes:
380 result[f"n{i}_{attr}"] = getattr(neighbor.value, attr)
381 return result
383 def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
384 self._train_x = x