Coverage for src/sensai/evaluation/eval_stats/eval_stats_clustering.py: 56%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import numpy as np 

2import sklearn 

3from typing import List, Dict, Tuple 

4 

5from .eval_stats_base import EvalStats, TMetric 

6from ..eval_stats import Metric, abstractmethod, Sequence, ABC 

7from ...clustering import EuclideanClusterer 

8 

9 

10class ClusterLabelsEvalStats(EvalStats[TMetric], ABC): 

11 NUM_CLUSTERS = "numClusters" 

12 AV_SIZE = "averageClusterSize" 

13 MEDIAN_SIZE = "medianClusterSize" 

14 STDDEV_SIZE = "clusterSizeStd" 

15 MIN_SIZE = "minClusterSize" 

16 MAX_SIZE = "maxClusterSize" 

17 NOISE_SIZE = "noiseClusterSize" 

18 

19 def __init__(self, labels: Sequence[int], noise_label: int, default_metrics: List[TMetric], 

20 additional_metrics: List[TMetric] = None): 

21 self.labels = np.array(labels) 

22 self.noiseLabel = noise_label 

23 

24 # splitting off noise cluster from other clusters, computing cluster size distribution 

25 self.clusterLabelsMask: np.ndarray = self.labels != noise_label 

26 self.noiseLabelsMask: np.ndarray = np.logical_not(self.clusterLabelsMask) 

27 self.clustersLabels = self.labels[self.clusterLabelsMask] 

28 self.clusterIdentifiers, self.clusterSizeDistribution = \ 

29 np.unique(self.labels[self.clusterLabelsMask], return_counts=True) 

30 self.noiseClusterSize = self.noiseLabelsMask.sum() 

31 

32 # operations like max and min raise an exception for empty arrays, this counteracts this effect 

33 if len(self.clusterSizeDistribution) == 0: 

34 self.clusterSizeDistribution = np.zeros(1) 

35 super().__init__(default_metrics, additional_metrics=additional_metrics) 

36 

37 def get_distribution_summary(self) -> Dict[str, float]: 

38 result = { 

39 self.NUM_CLUSTERS: len(self.clusterIdentifiers), 

40 self.AV_SIZE: self.clusterSizeDistribution.mean(), 

41 self.STDDEV_SIZE: self.clusterSizeDistribution.std(), 

42 self.MAX_SIZE: int(np.max(self.clusterSizeDistribution)), 

43 self.MIN_SIZE: int(np.min(self.clusterSizeDistribution)), 

44 self.MEDIAN_SIZE: np.median(self.clusterSizeDistribution) 

45 } 

46 if self.noiseLabel is not None: 

47 result[self.NOISE_SIZE] = int(self.noiseClusterSize) 

48 return result 

49 

50 def metrics_dict(self) -> Dict[str, float]: 

51 metrics_dict = super().metrics_dict() 

52 metrics_dict.update(self.get_distribution_summary()) 

53 return metrics_dict 

54 

55 

56class ClusteringUnsupervisedMetric(Metric["ClusteringUnsupervisedEvalStats"], ABC): 

57 pass 

58 

59 

60class RemovedNoiseUnsupervisedMetric(ClusteringUnsupervisedMetric): 

61 worstValue = 0 

62 

63 def compute_value_for_eval_stats(self, eval_stats: "ClusteringUnsupervisedEvalStats") -> float: 

64 if len(eval_stats.clustersLabels) == 0: # all is noise 

65 return 0 

66 return self.compute_value(eval_stats.clustersDatapoints, eval_stats.clustersLabels) 

67 

68 @staticmethod 

69 @abstractmethod 

70 def compute_value(datapoints: np.ndarray, labels: Sequence[int]): 

71 pass 

72 

73 

74class CalinskiHarabaszScore(RemovedNoiseUnsupervisedMetric): 

75 name = "CalinskiHarabaszScore" 

76 

77 @staticmethod 

78 def compute_value(datapoints: np.ndarray, labels: Sequence[int]): 

79 return sklearn.metrics.calinski_harabasz_score(datapoints, labels) 

80 

81 

82class DaviesBouldinScore(RemovedNoiseUnsupervisedMetric): 

83 name = "DaviesBouldinScore" 

84 # TODO: I think in some edge cases this score could be larger than one, one should look into that 

85 worstValue = 1 

86 

87 @staticmethod 

88 def compute_value(datapoints: np.ndarray, labels: Sequence[int]): 

89 return sklearn.metrics.davies_bouldin_score(datapoints, labels) 

90 

91 

92# Note: this takes a lot of time to compute for many datapoints 

93class SilhouetteScore(RemovedNoiseUnsupervisedMetric): 

94 name = "SilhouetteScore" 

95 worstValue = -1 

96 

97 @staticmethod 

98 def compute_value(datapoints: np.ndarray, labels: Sequence[int]): 

99 return sklearn.metrics.silhouette_score(datapoints, labels) 

100 

101 

102class ClusteringUnsupervisedEvalStats(ClusterLabelsEvalStats[ClusteringUnsupervisedMetric]): 

103 """ 

104 Class containing methods to compute evaluation statistics of a clustering result 

105 """ 

106 

107 def __init__(self, datapoints: np.ndarray, labels: Sequence[int], noise_label=-1, 

108 metrics: Sequence[ClusteringUnsupervisedMetric] = None, 

109 additional_metrics: Sequence[ClusteringUnsupervisedMetric] = None): 

110 """ 

111 :param datapoints: datapoints that were clustered 

112 :param labels: sequence of labels, usually the output of some clustering algorithm 

113 :param metrics: the metrics to compute. If None, will compute default metrics 

114 :param additional_metrics: the metrics to additionally compute 

115 """ 

116 if not len(labels) == len(datapoints): 

117 raise ValueError("Length of labels does not match length of datapoints array") 

118 if metrics is None: 

119 # Silhouette score is not included by default because it takes long to compute 

120 metrics = [CalinskiHarabaszScore(), DaviesBouldinScore()] 

121 super().__init__(labels, noise_label, metrics, additional_metrics=additional_metrics) 

122 self.datapoints = datapoints 

123 self.clustersDatapoints = self.datapoints[self.clusterLabelsMask] 

124 self.noiseDatapoints = self.datapoints[self.noiseLabelsMask] 

125 

126 @classmethod 

127 def from_model(cls, clustering_model: EuclideanClusterer): 

128 return cls(clustering_model.datapoints, clustering_model.labels, noise_label=clustering_model.noiseLabel) 

129 

130 

131class ClusteringSupervisedMetric(Metric["ClusteringSupervisedEvalStats"], ABC): 

132 pass 

133 

134 

135class RemovedCommonNoiseSupervisedMetric(ClusteringSupervisedMetric, ABC): 

136 worstValue = 0 

137 

138 def compute_value_for_eval_stats(self, eval_stats: "ClusteringSupervisedEvalStats") -> float: 

139 labels, true_labels = eval_stats.labels_with_removed_common_noise() 

140 if len(labels) == 0: 

141 return self.worstValue 

142 return self.compute_value(labels, true_labels) 

143 

144 @staticmethod 

145 @abstractmethod 

146 def compute_value(labels: Sequence[int], true_labels: Sequence[int]): 

147 pass 

148 

149 

150class VMeasureScore(RemovedCommonNoiseSupervisedMetric): 

151 name = "VMeasureScore" 

152 

153 @staticmethod 

154 def compute_value(labels: Sequence[int], true_labels: Sequence[int]): 

155 return sklearn.metrics.v_measure_score(labels, true_labels) 

156 

157 

158class AdjustedRandScore(RemovedCommonNoiseSupervisedMetric): 

159 name = "AdjustedRandScore" 

160 worstValue = -1 

161 

162 @staticmethod 

163 def compute_value(labels: Sequence[int], true_labels: Sequence[int]): 

164 return sklearn.metrics.adjusted_rand_score(labels, true_labels) 

165 

166 

167class FowlkesMallowsScore(RemovedCommonNoiseSupervisedMetric): 

168 name = "FowlkesMallowsScore" 

169 

170 @staticmethod 

171 def compute_value(labels: Sequence[int], true_labels: Sequence[int]): 

172 return sklearn.metrics.fowlkes_mallows_score(labels, true_labels) 

173 

174 

175class AdjustedMutualInfoScore(RemovedCommonNoiseSupervisedMetric): 

176 name = "AdjustedMutualInfoScore" 

177 

178 @staticmethod 

179 def compute_value(labels: Sequence[int], true_labels: Sequence[int]): 

180 return sklearn.metrics.adjusted_mutual_info_score(labels, true_labels) 

181 

182 

183class ClusteringSupervisedEvalStats(ClusterLabelsEvalStats[ClusteringSupervisedMetric]): 

184 """ 

185 Class containing methods to compute evaluation statistics a clustering result based on ground truth clusters 

186 """ 

187 def __init__(self, labels: Sequence[int], true_labels: Sequence[int], noise_label=-1, 

188 metrics: Sequence[ClusteringSupervisedMetric] = None, 

189 additional_metrics: Sequence[ClusteringSupervisedMetric] = None): 

190 """ 

191 :param labels: sequence of labels, usually the output of some clustering algorithm 

192 :param true_labels: sequence of labels that represent the ground truth clusters 

193 :param metrics: the metrics to compute. If None, will compute default metrics 

194 :param additional_metrics: the metrics to additionally compute 

195 """ 

196 if len(labels) != len(true_labels): 

197 raise ValueError("true labels must be of same shape as labels") 

198 self.trueLabels = np.array(true_labels) 

199 self._labels_with_removed_common_noise = None 

200 if metrics is None: 

201 metrics = [VMeasureScore(), FowlkesMallowsScore(), AdjustedRandScore(), AdjustedMutualInfoScore()] 

202 super().__init__(labels, noise_label, metrics, additional_metrics=additional_metrics) 

203 

204 @classmethod 

205 def from_model(cls, clustering_model: EuclideanClusterer, true_labels: Sequence[int]): 

206 return cls(clustering_model.labels, true_labels, noise_label=clustering_model.noiseLabel) 

207 

208 def labels_with_removed_common_noise(self) -> Tuple[np.ndarray, np.ndarray]: 

209 """ 

210 :return: tuple (labels, true_labels) where points classified as noise in true and predicted data were removed 

211 """ 

212 if self._labels_with_removed_common_noise is None: 

213 if self.noiseLabel is None: 

214 self._labels_with_removed_common_noise = self.labels, self.trueLabels 

215 else: 

216 common_noise_labels_mask = np.logical_and(self.noiseLabelsMask, self.trueLabels == self.noiseLabel) 

217 kept_labels_mask = np.logical_not(common_noise_labels_mask) 

218 self._labels_with_removed_common_noise = self.labels[kept_labels_mask], self.trueLabels[kept_labels_mask] 

219 return self._labels_with_removed_common_noise