import collections
import datetime
import logging
import typing
from abc import ABC, abstractmethod
from typing import Callable, List, Iterable, Optional
import numpy as np
import pandas as pd
from . import util, data_transformation
from .distance_metric import DistanceMetric
from .featuregen import FeatureGeneratorFromNamedTuples
from .util.string import object_repr
from .util.typing import PandasNamedTuple
from .vector_model import VectorClassificationModel, VectorRegressionModel
log = logging.getLogger(__name__)
[docs]class Neighbor:
def __init__(self, value: PandasNamedTuple, distance: float):
self.distance = distance
self.value = value
self.identifier = value.Index
[docs]class NeighborProvider(ABC):
def __init__(self, df_indexed_by_id: pd.DataFrame):
self.df = df_indexed_by_id
self.index = self.df.index
if any(self.index.duplicated()):
raise Exception("Dataframe index should not contain duplicates")
self.index_position_dict = {idx: pos for pos, idx in enumerate(self.index)}
[docs] @abstractmethod
def iter_potential_neighbors(self, value: PandasNamedTuple) -> Iterable[PandasNamedTuple]:
pass
@abstractmethod
def __str__(self):
return super().__str__()
[docs]class AllNeighborsProvider(NeighborProvider):
def __init__(self, df_indexed_by_id: pd.DataFrame):
super().__init__(df_indexed_by_id)
self.named_tuples = None
def __getstate__(self):
d = self.__dict__.copy()
d["namedTuples"] = None
return d
[docs] def iter_potential_neighbors(self, value):
identifier = value.Index
if self.named_tuples is None:
self.named_tuples = list(self.df.itertuples())
for nt in self.named_tuples:
if nt.Index != identifier:
yield nt
def __str__(self):
return str(self.__class__.__name__)
[docs]class TimerangeNeighborsProvider(NeighborProvider):
def __init__(self, df_indexed_by_id: pd.DataFrame, timestamps_column="timestamps",
past_time_range_days=120, future_time_range_days=120):
super().__init__(df_indexed_by_id)
if not pd.core.dtypes.common.is_datetime64_any_dtype(self.df[timestamps_column]):
raise Exception(f"Column {timestamps_column} does not have a compatible datatype")
self.timestamps_column = timestamps_column
self.past_time_range_days = past_time_range_days
self.future_time_range_days = future_time_range_days
self.past_timedelta = datetime.timedelta(days=past_time_range_days)
self.future_timedelta = datetime.timedelta(days=future_time_range_days)
[docs] def iter_potential_neighbors(self, value: PandasNamedTuple):
identifier = value.Index
input_time = getattr(value, self.timestamps_column)
max_time, min_time = input_time + self.future_timedelta, input_time - self.past_timedelta
neighbors_df = self.df[
self.df[self.timestamps_column].apply(lambda time: min_time < time < input_time)
]
if identifier in neighbors_df.index:
neighbors_df.drop(identifier, inplace=True)
return neighbors_df.itertuples()
def __str__(self):
return object_repr(self, ["past_time_range_days", "future_time_range_days"])
[docs]class AbstractKnnFinder(ABC):
[docs] @abstractmethod
def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
pass
@abstractmethod
def __str__(self):
super().__str__()
[docs]class CachingKNearestNeighboursFinder(AbstractKnnFinder):
"""
A nearest neighbor finder which uses a cache for distance metrics in order speed up repeated computations
of the neighbors of the same data point by keeping a pandas.Series of distances to all provided
data points cached. If the distance metric is of the composite type LinearCombinationDistanceMetric,
its component distance metrics are cached, such that weights in the linear combination can be varied
without necessitating recomputations.
"""
log = log.getChild(__qualname__)
def __init__(self, cache: 'CachingKNearestNeighboursFinder.DistanceMetricCache', distance_metric: DistanceMetric,
neighbor_provider: NeighborProvider):
self.neighbor_provider = neighbor_provider
# This field is purely for logging purposes
self.distance_metric = distance_metric
if isinstance(distance_metric, distance_metric.LinearCombinationDistanceMetric):
self.weighted_distance_metrics = [(cache.get_cached_metric(dm), w) for (w, dm) in distance_metric.metrics]
else:
self.weighted_distance_metrics = [(cache.get_cached_metric(distance_metric), 1)]
def __str__(self):
return object_repr(self, ["neighbor_provider", "distance_metric"])
[docs] class DistanceMetricCache:
"""
A cache for distance metrics which identifies equivalent distance metrics by their string representations.
The cache can be passed (consecutively) to multiple KNN models in order to speed up computations for the
same test data points. If the cache is reused, it is assumed that the neighbor provider remains the same.
"""
log = log.getChild(__qualname__)
def __init__(self):
self._cached_metrics_by_name = {}
[docs] def get_cached_metric(self, distance_metric):
key = str(distance_metric)
cached_metric = self._cached_metrics_by_name.get(key)
if cached_metric is None:
self.log.info(f"Creating new cached metric for key '{key}'")
cached_metric = CachingKNearestNeighboursFinder.CachedSeriesDistanceMetric(distance_metric)
self._cached_metrics_by_name[key] = cached_metric
else:
self.log.info(f"Reusing cached metric for key '{key}'")
return cached_metric
[docs] class CachedSeriesDistanceMetric:
"""
Provides caching for a wrapped distance metric: the series of all distances to provided potential neighbors
are retained in a cache
"""
def __init__(self, distance_metric):
self.distance_metric = distance_metric
self.cache = {}
[docs] def get_distance_series(self, named_tuple: PandasNamedTuple, potential_neighbor_values):
identifier = named_tuple.Index
series = self.cache.get(identifier)
if series is None:
distances = []
for neighborTuple in potential_neighbor_values:
distances.append(self.distance_metric.distance(named_tuple, neighborTuple))
series = pd.Series(distances)
self.cache[identifier] = series
return series
[docs] def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
potential_neighbors = list(self.neighbor_provider.iter_potential_neighbors(named_tuple))
summed_distance_series = None
for i, (metric, weight) in enumerate(self.weighted_distance_metrics):
weighted_distances_series = metric.get_distance_series(named_tuple, potential_neighbors) * weight
if i == 0:
summed_distance_series = weighted_distances_series.copy()
else:
summed_distance_series += weighted_distances_series
summed_distance_series.sort_values(ascending=True, inplace=True)
result = []
for i in range(n_neighbors):
neighbor_tuple = potential_neighbors[summed_distance_series.index[i]]
distance = summed_distance_series.iloc[i]
result.append(Neighbor(neighbor_tuple, distance))
return result
[docs]class KNearestNeighboursFinder(AbstractKnnFinder):
def __init__(self, distance_metric: DistanceMetric, neighbor_provider: NeighborProvider):
self.neighbor_provider = neighbor_provider
self.distance_metric = distance_metric
def __str__(self):
return object_repr(self, ["neighbor_provider", "distance_metric"])
[docs] def find_neighbors(self, named_tuple: PandasNamedTuple, n_neighbors=20) -> List[Neighbor]:
result = []
log.debug(f"Finding neighbors for {named_tuple.Index}")
for neighborTuple in self.neighbor_provider.iter_potential_neighbors(named_tuple):
distance = self.distance_metric.distance(named_tuple, neighborTuple)
result.append(Neighbor(neighborTuple, distance))
result.sort(key=lambda n: n.distance)
return result[:n_neighbors]
[docs]class KNearestNeighboursClassificationModel(VectorClassificationModel):
def __init__(self, num_neighbors: int, distance_metric: DistanceMetric,
neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
distance_based_weighting=False, distance_epsilon=1e-3,
distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs):
"""
:param num_neighbors: the number of nearest neighbors to consider
:param distance_metric: the distance metric to use
:param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data
:param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote
:param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances);
:param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations
of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics);
see class CachingKNearestNeighboursFinder
:param kwargs: parameters to pass on to super-classes
"""
super().__init__(**kwargs)
self.distance_epsilon = distance_epsilon
self.distance_based_weighting = distance_based_weighting
self.neighbor_provider_factory = neighbor_provider_factory
self.num_neighbors = num_neighbors
self.distance_metric = distance_metric
self.distance_metric_cache = distance_metric_cache
self.df = None
self.y = None
self.knn_finder = None
def _tostring_excludes(self) -> List[str]:
return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"]
# noinspection DuplicatedCode
def _fit_classifier(self, x: pd.DataFrame, y: pd.DataFrame):
assert len(y.columns) == 1, "Expected exactly one column in label set Y"
self.df = x.merge(y, how="inner", left_index=True, right_index=True)
self.y = y
neighbor_provider = self.neighbor_provider_factory(self.df)
if self.distance_metric_cache is None:
self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
else:
self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider)
log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}")
def _predict_class_probabilities(self, x: pd.DataFrame):
output_df = pd.DataFrame({label: np.nan for label in self._labels}, index=x.index)
for nt in x.itertuples():
neighbors = self.find_neighbors(nt)
probabilities = self._predict_class_probability_vector_from_neighbors(neighbors)
output_df.loc[nt.Index] = probabilities
return output_df
def _predict_class_probability_vector_from_neighbors(self, neighbors: List['Neighbor']):
weights = collections.defaultdict(lambda: 0)
total = 0
for neigh in neighbors:
if self.distance_based_weighting:
weight = 1.0 / (neigh.distance + self.distance_epsilon)
else:
weight = 1
weights[self._get_label(neigh)] += weight
total += weight
return [weights[label] / total for label in self._labels]
def _get_label(self, neighbor: 'Neighbor'):
return self.y.iloc[:, 0].loc[neighbor.identifier]
[docs] def find_neighbors(self, named_tuple):
return self.knn_finder.find_neighbors(named_tuple, self.num_neighbors)
[docs]class KNearestNeighboursRegressionModel(VectorRegressionModel):
def __init__(self, num_neighbors: int, distance_metric: DistanceMetric,
neighbor_provider_factory: Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
distance_based_weighting=False, distance_epsilon=1e-3,
distance_metric_cache: CachingKNearestNeighboursFinder.DistanceMetricCache = None, **kwargs):
"""
:param num_neighbors: the number of nearest neighbors to consider
:param distance_metric: the distance metric to use
:param neighbor_provider_factory: a factory with which a neighbor provider can be constructed using data
:param distance_based_weighting: whether to weight neighbors according to their distance (inverse); if False, use democratic vote
:param distance_epsilon: a distance that is added to all distances for distance-based weighting (in order to avoid 0 distances);
:param distance_metric_cache: a cache for distance metrics which shall be used to store speed up repeated computations
of the neighbors of the same data point by keeping series of distances cached (particularly for composite distance metrics);
see class CachingKNearestNeighboursFinder
:param kwargs: parameters to pass on to super-classes
"""
super().__init__(**kwargs)
self.distance_epsilon = distance_epsilon
self.distance_based_weighting = distance_based_weighting
self.neighbor_provider_factory = neighbor_provider_factory
self.num_neighbors = num_neighbors
self.distance_metric = distance_metric
self.distance_metric_cache = distance_metric_cache
self.df = None
self.y = None
self.knn_finder = None
def _tostring_excludes(self) -> List[str]:
return super()._tostring_excludes() + ["neighbor_provider_factory", "distance_metric", "distance_metric_cache", "df", "y"]
# noinspection DuplicatedCode
def _fit(self, x: pd.DataFrame, y: pd.DataFrame):
assert len(y.columns) == 1, "Expected exactly one column in label set Y"
self.df = x.merge(y, how="inner", left_index=True, right_index=True)
self.y = y
neighbor_provider = self.neighbor_provider_factory(self.df)
if self.distance_metric_cache is None:
self.knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
else:
self.knn_finder = CachingKNearestNeighboursFinder(self.distance_metric_cache, self.distance_metric, neighbor_provider)
log.info(f"Using neighbor provider of type {self.knn_finder.__class__.__name__}")
def _get_target(self, neighbor: Neighbor):
return self.y.iloc[:, 0].loc[neighbor.identifier]
def _predict_single_input(self, named_tuple):
neighbors = self.knn_finder.find_neighbors(named_tuple, self.num_neighbors)
neighbor_targets = np.array([self._get_target(n) for n in neighbors])
if self.distance_based_weighting:
neighbor_weights = np.array([1.0 / (n.distance + self.distance_epsilon) for n in neighbors])
return np.sum(neighbor_targets * neighbor_weights) / np.sum(neighbor_weights)
else:
return np.mean(neighbor_targets)
def _predict(self, x: pd.DataFrame) -> pd.DataFrame:
predicted_values = []
for i, nt in enumerate(x.itertuples()):
predicted_values.append(self._predict_single_input(nt))
return pd.DataFrame({self._predictedVariableNames[0]: predicted_values}, index=x.index)
[docs]class FeatureGeneratorNeighbors(FeatureGeneratorFromNamedTuples):
"""
Generates features based on nearest neighbors. For each neighbor, a set of features is added to the output data frame.
Each feature has the name "n{0-based neighbor index}_{feature name}", where the feature names are configurable
at construction. The feature name "distance", which indicates the distance of the neighbor to the data point is
always present.
"""
def __init__(self, num_neighbors: int,
neighbor_attributes: typing.List[str],
distance_metric: DistanceMetric,
neighbor_provider_factory: typing.Callable[[pd.DataFrame], NeighborProvider] = AllNeighborsProvider,
cache: util.cache.KeyValueCache = None,
categorical_feature_names: typing.Sequence[str] = (),
normalisation_rules: typing.Sequence[data_transformation.DFTNormalisation.Rule] = ()):
"""
:param num_neighbors: the number of neighbors for to generate features
:param neighbor_attributes: the attributes of the neighbor's named tuple to include as features (in addition to "distance")
:param distance_metric: the distance metric defining which neighbors are near
:param neighbor_provider_factory: a factory for the creation of neighbor provider
:param cache: an optional key-value cache in which feature values are stored by data point identifier (as given by the DataFrame's
index)
"""
super().__init__(cache=cache, categorical_feature_names=categorical_feature_names, normalisation_rules=normalisation_rules)
self.neighbor_attributes = neighbor_attributes
self.distance_metric = distance_metric
self.neighbor_provider_factory = neighbor_provider_factory
self.num_neighbors = num_neighbors
self._knn_finder: Optional[KNearestNeighboursFinder] = None
self._train_x = None
def _generate(self, df: pd.DataFrame, ctx=None):
if self._train_x is None:
raise Exception("Feature generator has not been fitted")
neighbor_provider = self.neighbor_provider_factory(self._train_x)
self._knn_finder = KNearestNeighboursFinder(self.distance_metric, neighbor_provider)
return super()._generate(df, ctx)
def _generate_feature_dict(self, named_tuple) -> typing.Dict[str, typing.Any]:
neighbors = self._knn_finder.find_neighbors(named_tuple, self.num_neighbors)
result = {}
for i, neighbor in enumerate(neighbors):
result[f"n{i}_distance"] = neighbor.distance
for attr in self.neighbor_attributes:
result[f"n{i}_{attr}"] = getattr(neighbor.value, attr)
return result
def _fit(self, x: pd.DataFrame, y: pd.DataFrame = None, ctx=None):
self._train_x = x