Coverage for src/sensai/nearest_neighbors.py: 29%
249 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
1import collections
2import datetime
3import logging
4import typing
5from abc import ABC, abstractmethod
6from typing import Callable, List, Iterable, Optional
8import numpy as np
9import pandas as pd
11from . import util, data_transformation
12from .distance_metric import DistanceMetric
13from .featuregen import FeatureGeneratorFromNamedTuples
14from .util.string import object_repr
15from .util.typing import PandasNamedTuple
16from .vector_model import VectorClassificationModel, VectorRegressionModel
18log = logging.getLogger(__name__)
21class Neighbor:
22 def __init__(self, value: PandasNamedTuple, distance: float):
23 self.distance = distance
24 self.value = value
25 self.identifier = value.Index
28class NeighborProvider(ABC):
29 def __init__(self, df_indexed_by_id: pd.DataFrame):
30 self.df = df_indexed_by_id
31 self.index = self.df.index
32 if any(self.index.duplicated()):
33 raise Exception("Dataframe index should not contain duplicates")
34 self.index_position_dict = {idx: pos for pos, idx in enumerate(self.index)}
36 @abstractmethod
37 def iter_potential_neighbors(self, value: PandasNamedTuple) -> Iterable[PandasNamedTuple]:
38 pass
40 @abstractmethod
41 def __str__(self):
42 return super().__str__()
45class AllNeighborsProvider(NeighborProvider):
46 def __init__(self, df_indexed_by_id: pd.DataFrame):
47 super().__init__(df_indexed_by_id)
48 self.named_tuples = None
50 def __getstate__(self):
51 d = self.__dict__.copy()
52 d["namedTuples"] = None
53 return d
55 def iter_potential_neighbors(self, value):
56 identifier = value.Index
57 if self.named_tuples is None:
58 self.named_tuples = list(self.df.itertuples())
59 for nt in self.named_tuples:
60 if nt.Index != identifier:
61 yield nt
63 def __str__(self):
64 return str(self.__class__.__name__)
67class TimerangeNeighborsProvider(NeighborProvider):
68 def __init__(self, df_indexed_by_id: pd.DataFrame, timestamps_column="timestamps",
69 past_time_range_days=120, future_time_range_days=120):
70 super().__init__(df_indexed_by_id)
71 if not pd.core.dtypes.common.is_datetime64_any_dtype(self.df[timestamps_column]):
72 raise Exception(f"Column {timestamps_column} does not have a compatible datatype")
73 self.timestamps_column = timestamps_column
74 self.past_time_range_days = past_time_range_days
75 self.future_time_range_days = future_time_range_days
76 self.past_timedelta = datetime.timedelta(days=past_time_range_days)
77 self.future_timedelta = datetime.timedelta(days=future_time_range_days)
79 def iter_potential_neighbors(self, value: PandasNamedTuple):
80 identifier = value.Index
81 input_time = getattr(value, self.timestamps_column)
82 max_time, min_time = input_time + self.future_timedelta, input_time - self.past_timedelta
83 neighbors_df = self.df[
84 self.df[self.timestamps_column].apply(lambda time: min_time < time < input_time)
85 ]
86 if identifier in neighbors_df.index:
87 neighbors_df.drop(identifier, inplace=True)
88 return neighbors_df.itertuples()
90 def __str__(self):
91 return object_repr(self, ["past_time_range_days", "future_time_range_days"])
94class AbstractKnnFinder(ABC):
95 @abstractmethod
96 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
97 pass
99 @abstractmethod
100 def __str__(self):
101 super().__str__()
104class CachingKNearestNeighboursFinder(AbstractKnnFinder):
105 """
106 A nearest neighbor finder which uses a cache for distance metrics in order speed up repeated computations
107 of the neighbors of the same data point by keeping a pandas.Series of distances to all provided
108 data points cached. If the distance metric is of the composite type LinearCombinationDistanceMetric,
109 its component distance metrics are cached, such that weights in the linear combination can be varied
110 without necessitating recomputations.
111 """
112 log = log.getChild(__qualname__)
114 def __init__(self, cache: 'CachingKNearestNeighboursFinder.DistanceMetricCache', distance_metric: DistanceMetric,
115 neighbor_provider: NeighborProvider):
116 self.neighbor_provider = neighbor_provider
117 # This field is purely for logging purposes
118 self.distance_metric = distance_metric
119 if isinstance(distance_metric, distance_metric.LinearCombinationDistanceMetric):
120 self.weighted_distance_metrics = [(cache.get_cached_metric(dm), w) for (w, dm) in distance_metric.metrics]
121 else:
122 self.weighted_distance_metrics = [(cache.get_cached_metric(distance_metric), 1)]
124 def __str__(self):
125 return object_repr(self, ["neighbor_provider", "distance_metric"])
127 class DistanceMetricCache:
128 """
129 A cache for distance metrics which identifies equivalent distance metrics by their string representations.
130 The cache can be passed (consecutively) to multiple KNN models in order to speed up computations for the
131 same test data points. If the cache is reused, it is assumed that the neighbor provider remains the same.
132 """
133 log = log.getChild(__qualname__)
135 def __init__(self):
136 self._cached_metrics_by_name = {}
138 def get_cached_metric(self, distance_metric):
139 key = str(distance_metric)
140 cached_metric = self._cached_metrics_by_name.get(key)
141 if cached_metric is None:
142 self.log.info(f"Creating new cached metric for key '{key}'")
143 cached_metric = CachingKNearestNeighboursFinder.CachedSeriesDistanceMetric(distance_metric)
144 self._cached_metrics_by_name[key] = cached_metric
145 else:
146 self.log.info(f"Reusing cached metric for key '{key}'")
147 return cached_metric
149 class CachedSeriesDistanceMetric:
150 """
151 Provides caching for a wrapped distance metric: the series of all distances to provided potential neighbors
152 are retained in a cache
153 """
154 def __init__(self, distance_metric):
155 self.distance_metric = distance_metric
156 self.cache = {}
158 def get_distance_series(self, named_tuple: PandasNamedTuple, potential_neighbor_values):
159 identifier = named_tuple.Index
160 series = self.cache.get(identifier)
161 if series is None:
162 distances = []
163 for neighborTuple in potential_neighbor_values:
164 distances.append(self.distance_metric.distance(named_tuple, neighborTuple))
165 series = pd.Series(distances)
166 self.cache[identifier] = series
167 return series
169 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
170 potential_neighbors = list(self.neighbor_provider.iter_potential_neighbors(named_tuple))
171 summed_distance_series = None
172 for i, (metric, weight) in enumerate(self.weighted_distance_metrics):
173 weighted_distances_series = metric.get_distance_series(named_tuple, potential_neighbors) * weight
174 if i == 0:
175 summed_distance_series = weighted_distances_series.copy()
176 else:
177 summed_distance_series += weighted_distances_series
178 summed_distance_series.sort_values(ascending=True, inplace=True)
179 result = []
180 for i in range(n_neighbors):
181 neighbor_tuple = potential_neighbors[summed_distance_series.index[i]]
182 distance = summed_distance_series.iloc[i]
183 result.append(Neighbor(neighbor_tuple, distance))
184 return result
187class KNearestNeighboursFinder(AbstractKnnFinder):
188 def __init__(self, distance_metric: DistanceMetric, neighbor_provider: NeighborProvider):
189 self.neighbor_provider = neighbor_provider
190 self.distance_metric = distance_metric
192 def __str__(self):
193 return object_repr(self, ["neighbor_provider", "distance_metric"])
195 def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
196 result = []
197 log.debug(f"Finding neighbors for {named_tuple.Index}")
198 for neighborTuple in self.neighbor_provider.iter_potential_neighbors(named_tuple):
199 distance = self.distance_metric.distance(named_tuple, neighborTuple)
200 result.append(Neighbor(neighborTuple, distance))
201 result.sort(key=lambda n: n.distance)
202 return result[:n_neighbors]
205class KNearestNeighboursClassificationModel(VectorClassificationModel):
206 def __init__(self, num_neighbors: int, distance_metric: DistanceMetric,
207 neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
208 distance_based_weighting=False, distance_epsilon=1e-3,
209 distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs):
210 """
211 :param num_neighbors: the number of nearest neighbors to consider
212 :param distance_metric: the distance metric to use
213 :param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data
214 :param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote
215 :param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances);
216 :param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations
217 of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics);
218 see class CachingKNearestNeighboursFinder
219 :param kwargs: parameters to pass on to super-classes
220 """
221 super().__init__(**kwargs)
222 self.distance_epsilon = distance_epsilon
223 self.distance_based_weighting = distance_based_weighting
224 self.neighbor_provider_factory = neighbor_provider_factory
225 self.num_neighbors = num_neighbors
226 self.distance_metric = distance_metric
227 self.distance_metric_cache = distance_metric_cache
228 self.df = None
229 self.y = None
230 self.knn_finder = None
232 def _tostring_excludes(self) -> List[str]:
233 return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"]
235 # noinspection DuplicatedCode
236 def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame):
237 assert len(y.columns) == 1, "Expected exactly one column in label set Y"
238 self.df = x.merge(y, how="inner", left_index=True, right_index=True)
239 self.y = y
240 neighbor_provider = self.neighbor_provider_factory(self.df)
241 if self.distance_metric_cache is None:
242 self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
243 else:
244 self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider)
245 log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}")
247 def _predict_class_probabilities(self, x: pd.DataFrame):
248 output_df = pd.DataFrame({label: np.nan for label in self._labels}, index=x.index)
249 for nt in x.itertuples():
250 neighbors = self.find_neighbors(nt)
251 probabilities = self._predict_class_probability_vector_from_neighbors(neighbors)
252 output_df.loc[nt.Index] = probabilities
253 return output_df
255 def _predict_class_probability_vector_from_neighbors(self, neighbors: List['Neighbor']):
256 weights = collections.defaultdict(lambda: 0)
257 total = 0
258 for neigh in neighbors:
259 if self.distance_based_weighting:
260 weight = 1.0 / (neigh.distance + self.distance_epsilon)
261 else:
262 weight = 1
263 weights[self._get_label(neigh)] += weight
264 total += weight
265 return [weights[label] / total for label in self._labels]
267 def _get_label(self, neighbor: 'Neighbor'):
268 return self.y.iloc[:, 0].loc[neighbor.identifier]
270 def find_neighbors(self, named_tuple):
271 return self.knn_finder.find_neighbors(named_tuple, self.num_neighbors)
274class KNearestNeighboursRegressionModel(VectorRegressionModel):
275 def __init__(self, num_neighbors: int, distance_metric: DistanceMetric,
276 neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
277 distance_based_weighting=False, distance_epsilon=1e-3,
278 distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs):
279 """
280 :param num_neighbors: the number of nearest neighbors to consider
281 :param distance_metric: the distance metric to use
282 :param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data
283 :param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote
284 :param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances);
285 :param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations
286 of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics);
287 see class CachingKNearestNeighboursFinder
288 :param kwargs: parameters to pass on to super-classes
289 """
290 super().__init__(**kwargs)
291 self.distance_epsilon = distance_epsilon
292 self.distance_based_weighting = distance_based_weighting
293 self.neighbor_provider_factory = neighbor_provider_factory
294 self.num_neighbors = num_neighbors
295 self.distance_metric = distance_metric
296 self.distance_metric_cache = distance_metric_cache
297 self.df = None
298 self.y = None
299 self.knn_finder = None
301 def _tostring_excludes(self) -> List[str]:
302 return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"]
304 # noinspection DuplicatedCode
305 def _fit(self, x: pd.DataFrame, y: pd.DataFrame):
306 assert len(y.columns) == 1, "Expected exactly one column in label set Y"
307 self.df = x.merge(y, how="inner", left_index=True, right_index=True)
308 self.y = y
309 neighbor_provider = self.neighbor_provider_factory(self.df)
310 if self.distance_metric_cache is None:
311 self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
312 else:
313 self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider)
314 log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}")
316 def _get_target(self, neighbor: Neighbor):
317 return self.y.iloc[:, 0].loc[neighbor.identifier]
319 def _predict_single_input(self, named_tuple):
320 neighbors = self.knn_finder.find_neighbors(named_tuple, self.num_neighbors)
321 neighbor_targets = np.array([self._get_target(n) for n in neighbors])
322 if self.distance_based_weighting:
323 neighbor_weights = np.array([1.0 / (n.distance + self.distance_epsilon) for n in neighbors])
324 return np.sum(neighbor_targets * neighbor_weights) / np.sum(neighbor_weights)
325 else:
326 return np.mean(neighbor_targets)
328 def _predict(self, x: pd.DataFrame) -> pd.DataFrame:
329 predicted_values = []
330 for i, nt in enumerate(x.itertuples()):
331 predicted_values.append(self._predict_single_input(nt))
332 return pd.DataFrame({self._predictedVariableNames[0]: predicted_values}, index=x.index)
335class FeatureGeneratorNeighbors(FeatureGeneratorFromNamedTuples):
336 """
337 Generates features based on nearest neighbors. For each neighbor, a set of features is added to the output data frame.
338 Each feature has the name "n{0-based neighbor index}_{feature name}", where the feature names are configurable
339 at construction. The feature name "distance", which indicates the distance of the neighbor to the data point is
340 always present.
341 """
342 def __init__(self, num_neighbors: int,
343 neighbor_attributes: typing.List[str],
344 distance_metric: DistanceMetric,
345 neighbor_provider_factory: typing.Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
346 cache: util.cache.KeyValueCache = None,
347 categorical_feature_names: typing.Sequence[str] = (),
348 normalisation_rules: typing.Sequence[data_transformation.DFTNormalisation.Rule] = ()):
349 """
350 :param num_neighbors: the number of neighbors for to generate features
351 :param neighbor_attributes: the attributes of the neighbor's named tuple to include as features (in addition to "distance")
352 :param distance_metric: the distance metric defining which neighbors are near
353 :param neighbor_provider_factory: a factory for the creation of neighbor provider
354 :param cache: an optional key-value cache in which feature values are stored by data point identifier (as given by the DataFrame's
355 index)
356 """
357 super().__init__(cache=cache, categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules)
358 self.neighbor_attributes = neighbor_attributes
359 self.distance_metric = distance_metric
360 self.neighbor_provider_factory = neighbor_provider_factory
361 self.num_neighbors = num_neighbors
362 self._knn_finder: Optional[KNearestNeighboursFinder] = None
363 self._train_x = None
365 def _generate(self, df: pd.DataFrame, ctx=None):
366 if self._train_x is None:
367 raise Exception("Feature generator has not been fitted")
368 neighbor_provider = self.neighbor_provider_factory(self._train_x)
369 self._knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
370 return super()._generate(df, ctx)
372 def _generate_feature_dict(self, named_tuple) -> typing.Dict[str, typing.Any]:
373 neighbors = self._knn_finder.find_neighbors(named_tuple, self.num_neighbors)
374 result = {}
375 for i, neighbor in enumerate(neighbors):
376 result[f"n{i}_distance"] = neighbor.distance
377 for attr in self.neighbor_attributes:
378 result[f"n{i}_{attr}"] = getattr(neighbor.value, attr)
379 return result
381 def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
382 self._train_x = x