Coverage for src/sensai/clustering/clustering_base.py: 35%

110 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import logging 

2from abc import ABC, abstractmethod 

3from typing import Union, Set, Callable, Iterable, Optional 

4 

5import numpy as np 

6import pandas as pd 

7from scipy.spatial import distance_matrix 

8 

9from ..util.cache import PickleLoadSaveMixin 

10 

11log = logging.getLogger(__name__) 

12 

13 

14# TODO at some point in the future: generalize to other input and deal with algorithms that allow prediction of labels 

15class EuclideanClusterer(PickleLoadSaveMixin, ABC): 

16 """ 

17 Base class for all clustering algorithms. Supports noise clusters and relabelling of identified clusters as noise 

18 based on their size. 

19 

20 :param noise_label: label that is associated with the noise cluster or None 

21 :param min_cluster_size: if not None, clusters below this size will be labeled as noise 

22 :param max_cluster_size: if not None, clusters above this size will be labeled as noise 

23 """ 

24 def __init__(self, noise_label=-1, min_cluster_size: int = None, max_cluster_size: int = None): 

25 self._datapoints: Optional[np.ndarray] = None 

26 self._labels: Optional[np.ndarray] = None 

27 self._clusterIdentifiers: Optional[Set[int]] = None 

28 self._nonNoiseClusterIdentifiers: Optional[Set[int]] = None 

29 

30 if min_cluster_size is not None or max_cluster_size is not None: 

31 if noise_label is None: 

32 raise ValueError("the noise label has to be not None for non-trivial bounds on cluster sizes") 

33 self.noiseLabel = noise_label 

34 self.maxClusterSize = max_cluster_size if max_cluster_size is not None else np.inf 

35 self.minClusterSize = min_cluster_size if min_cluster_size is not None else -np.inf 

36 

37 self._clusterDict = {} 

38 self._numClusters: Optional[int] = None 

39 

40 class Cluster: 

41 def __init__(self, datapoints: np.ndarray, identifier: Union[int, str]): 

42 self.datapoints = datapoints 

43 self.identifier = identifier 

44 self._radius: Optional[float] = None 

45 self._centroid: Optional[np.ndarray] = None 

46 

47 def __len__(self): 

48 return len(self.datapoints) 

49 

50 def __str__(self): 

51 return f"{self.__class__.__name__}_{self.identifier}" 

52 

53 def _compute_radius(self): 

54 return np.max(distance_matrix([self.centroid()], self.datapoints)) 

55 

56 def _compute_centroid(self): 

57 return np.mean(self.datapoints, axis=0) 

58 

59 def centroid(self): 

60 if self._centroid is None: 

61 self._centroid = self._compute_centroid() 

62 return self._centroid 

63 

64 def radius(self): 

65 if self._radius is None: 

66 self._radius = self._compute_radius() 

67 return self._radius 

68 

69 def summary_dict(self): 

70 """ 

71 :return: dictionary containing coarse information about the cluster (e.g. num_members and centroid) 

72 """ 

73 return { 

74 "identifier": self.identifier, 

75 "centroid": self.centroid(), 

76 "numMembers": len(self), 

77 "radius": self.radius() 

78 } 

79 

80 @classmethod 

81 def __str__(cls): 

82 return cls.__name__ 

83 

84 def clusters(self, condition: Callable[[Cluster], bool] = None) -> Iterable[Cluster]: 

85 """ 

86 :param condition: if provided, only clusters fulfilling the condition will be included 

87 :return: generator of clusters 

88 """ 

89 percentage_to_log = 0 

90 for i, clusterId in enumerate(self._nonNoiseClusterIdentifiers): 

91 # logging process through the loop 

92 percentage_generated = int(100 * i / self.num_clusters) 

93 if percentage_generated == percentage_to_log: 

94 log.info(f"Processed {percentage_to_log}% of clusters") 

95 percentage_to_log += 5 

96 

97 cluster = self.get_cluster(clusterId) 

98 if condition is None or condition(cluster): 

99 yield cluster 

100 

101 def noise_cluster(self): 

102 if self.noiseLabel is None: 

103 raise NotImplementedError(f"The algorithm {self} does not provide a noise cluster") 

104 return self.get_cluster(self.noiseLabel) 

105 

106 def summary_df(self, condition: Callable[[Cluster], bool] = None): 

107 """ 

108 :param condition: if provided, only clusters fulfilling the condition will be included 

109 :return: pandas DataFrame containing coarse information about the clusters 

110 """ 

111 summary_dicts = [cluster.summary_dict() for cluster in self.clusters(condition=condition)] 

112 return pd.DataFrame(summary_dicts).set_index("identifier", drop=True) 

113 

114 def fit(self, data: np.ndarray) -> None: 

115 log.info(f"Fitting {self} to {len(data)} coordinate datapoints.") 

116 labels = self._compute_labels(data) 

117 if len(labels) != len(data): 

118 raise Exception(f"Bad Implementation: number of labels does not match number of datapoints") 

119 # Relabel clusters that do not fulfill size bounds as noise 

120 if self.minClusterSize != -np.inf or self.maxClusterSize != np.inf: 

121 for clusterId, clusterSize in zip(*np.unique(labels, return_counts=True)): 

122 if not self.minClusterSize <= clusterSize <= self.maxClusterSize: 

123 labels[labels == clusterId] = self.noiseLabel 

124 

125 self._datapoints = data 

126 self._clusterIdentifiers = set(labels) 

127 self._labels = labels 

128 if self.noiseLabel is not None: 

129 self._nonNoiseClusterIdentifiers = self._clusterIdentifiers.difference({self.noiseLabel}) 

130 log.info(f"{self} found {self.num_clusters} clusters") 

131 

132 @property 

133 def is_fitted(self): 

134 return self._datapoints is not None 

135 

136 @property 

137 def datapoints(self) -> np.ndarray: 

138 assert self.is_fitted 

139 return self._datapoints 

140 

141 @property 

142 def labels(self) -> np.ndarray: 

143 assert self.is_fitted 

144 return self._labels 

145 

146 @property 

147 def cluster_identifiers(self) -> Set[int]: 

148 assert self.is_fitted 

149 return self._clusterIdentifiers 

150 

151 # unfortunately, there seems to be no way to annotate the return type correctly 

152 # https://github.com/python/mypy/issues/3993 

153 def get_cluster(self, cluster_id: int) -> Cluster: 

154 if cluster_id not in self.labels: 

155 raise KeyError(f"no cluster for id {cluster_id}") 

156 result = self._clusterDict.get(cluster_id) 

157 if result is None: 

158 result = self.Cluster(self.datapoints[self.labels == cluster_id], identifier=cluster_id) 

159 self._clusterDict[cluster_id] = result 

160 return result 

161 

162 @property 

163 def num_clusters(self) -> int: 

164 return len(self._nonNoiseClusterIdentifiers) 

165 

166 @abstractmethod 

167 def _compute_labels(self, x: np.ndarray) -> np.ndarray: 

168 """ 

169 Fit the clustering model and return an array of integer cluster labels 

170 

171 :param x: the datapoints 

172 :return: list of the same length as the input datapoints; it represents the mapping coordinate -> cluster_id 

173 """ 

174 pass