Coverage for src/sensai/evaluation/eval_stats/eval_stats_clustering.py: 56%
130 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-13 22:17 +0000
1import numpy as np
2import sklearn
3from typing import List, Dict, Tuple
5from .eval_stats_base import EvalStats, TMetric
6from ..eval_stats import Metric, abstractmethod, Sequence, ABC
7from ...clustering import EuclideanClusterer
10class ClusterLabelsEvalStats(EvalStats[TMetric], ABC):
11 NUM_CLUSTERS = "numClusters"
12 AV_SIZE = "averageClusterSize"
13 MEDIAN_SIZE = "medianClusterSize"
14 STDDEV_SIZE = "clusterSizeStd"
15 MIN_SIZE = "minClusterSize"
16 MAX_SIZE = "maxClusterSize"
17 NOISE_SIZE = "noiseClusterSize"
19 def __init__(self, labels: Sequence[int], noise_label: int, default_metrics: List[TMetric],
20 additional_metrics: List[TMetric] = None):
21 self.labels = np.array(labels)
22 self.noiseLabel = noise_label
24 # splitting off noise cluster from other clusters, computing cluster size distribution
25 self.clusterLabelsMask: np.ndarray = self.labels != noise_label
26 self.noiseLabelsMask: np.ndarray = np.logical_not(self.clusterLabelsMask)
27 self.clustersLabels = self.labels[self.clusterLabelsMask]
28 self.clusterIdentifiers, self.clusterSizeDistribution = \
29 np.unique(self.labels[self.clusterLabelsMask], return_counts=True)
30 self.noiseClusterSize = self.noiseLabelsMask.sum()
32 # operations like max and min raise an exception for empty arrays, this counteracts this effect
33 if len(self.clusterSizeDistribution) == 0:
34 self.clusterSizeDistribution = np.zeros(1)
35 super().__init__(default_metrics, additional_metrics=additional_metrics)
37 def get_distribution_summary(self) -> Dict[str, float]:
38 result = {
39 self.NUM_CLUSTERS: len(self.clusterIdentifiers),
40 self.AV_SIZE: self.clusterSizeDistribution.mean(),
41 self.STDDEV_SIZE: self.clusterSizeDistribution.std(),
42 self.MAX_SIZE: int(np.max(self.clusterSizeDistribution)),
43 self.MIN_SIZE: int(np.min(self.clusterSizeDistribution)),
44 self.MEDIAN_SIZE: np.median(self.clusterSizeDistribution)
45 }
46 if self.noiseLabel is not None:
47 result[self.NOISE_SIZE] = int(self.noiseClusterSize)
48 return result
50 def metrics_dict(self) -> Dict[str, float]:
51 metrics_dict = super().metrics_dict()
52 metrics_dict.update(self.get_distribution_summary())
53 return metrics_dict
56class ClusteringUnsupervisedMetric(Metric["ClusteringUnsupervisedEvalStats"], ABC):
57 pass
60class RemovedNoiseUnsupervisedMetric(ClusteringUnsupervisedMetric):
61 worstValue = 0
63 def compute_value_for_eval_stats(self, eval_stats: "ClusteringUnsupervisedEvalStats") -> float:
64 if len(eval_stats.clustersLabels) == 0: # all is noise
65 return 0
66 return self.compute_value(eval_stats.clustersDatapoints, eval_stats.clustersLabels)
68 @staticmethod
69 @abstractmethod
70 def compute_value(datapoints: np.ndarray, labels: Sequence[int]):
71 pass
74class CalinskiHarabaszScore(RemovedNoiseUnsupervisedMetric):
75 name = "CalinskiHarabaszScore"
77 @staticmethod
78 def compute_value(datapoints: np.ndarray, labels: Sequence[int]):
79 return sklearn.metrics.calinski_harabasz_score(datapoints, labels)
82class DaviesBouldinScore(RemovedNoiseUnsupervisedMetric):
83 name = "DaviesBouldinScore"
84 # TODO: I think in some edge cases this score could be larger than one, one should look into that
85 worstValue = 1
87 @staticmethod
88 def compute_value(datapoints: np.ndarray, labels: Sequence[int]):
89 return sklearn.metrics.davies_bouldin_score(datapoints, labels)
92# Note: this takes a lot of time to compute for many datapoints
93class SilhouetteScore(RemovedNoiseUnsupervisedMetric):
94 name = "SilhouetteScore"
95 worstValue = -1
97 @staticmethod
98 def compute_value(datapoints: np.ndarray, labels: Sequence[int]):
99 return sklearn.metrics.silhouette_score(datapoints, labels)
102class ClusteringUnsupervisedEvalStats(ClusterLabelsEvalStats[ClusteringUnsupervisedMetric]):
103 """
104 Class containing methods to compute evaluation statistics of a clustering result
105 """
107 def __init__(self, datapoints: np.ndarray, labels: Sequence[int], noise_label=-1,
108 metrics: Sequence[ClusteringUnsupervisedMetric] = None,
109 additional_metrics: Sequence[ClusteringUnsupervisedMetric] = None):
110 """
111 :param datapoints: datapoints that were clustered
112 :param labels: sequence of labels, usually the output of some clustering algorithm
113 :param metrics: the metrics to compute. If None, will compute default metrics
114 :param additional_metrics: the metrics to additionally compute
115 """
116 if not len(labels) == len(datapoints):
117 raise ValueError("Length of labels does not match length of datapoints array")
118 if metrics is None:
119 # Silhouette score is not included by default because it takes long to compute
120 metrics = [CalinskiHarabaszScore(), DaviesBouldinScore()]
121 super().__init__(labels, noise_label, metrics, additional_metrics=additional_metrics)
122 self.datapoints = datapoints
123 self.clustersDatapoints = self.datapoints[self.clusterLabelsMask]
124 self.noiseDatapoints = self.datapoints[self.noiseLabelsMask]
126 @classmethod
127 def from_model(cls, clustering_model: EuclideanClusterer):
128 return cls(clustering_model.datapoints, clustering_model.labels, noise_label=clustering_model.noiseLabel)
131class ClusteringSupervisedMetric(Metric["ClusteringSupervisedEvalStats"], ABC):
132 pass
135class RemovedCommonNoiseSupervisedMetric(ClusteringSupervisedMetric, ABC):
136 worstValue = 0
138 def compute_value_for_eval_stats(self, eval_stats: "ClusteringSupervisedEvalStats") -> float:
139 labels, true_labels = eval_stats.labels_with_removed_common_noise()
140 if len(labels) == 0:
141 return self.worstValue
142 return self.compute_value(labels, true_labels)
144 @staticmethod
145 @abstractmethod
146 def compute_value(labels: Sequence[int], true_labels: Sequence[int]):
147 pass
150class VMeasureScore(RemovedCommonNoiseSupervisedMetric):
151 name = "VMeasureScore"
153 @staticmethod
154 def compute_value(labels: Sequence[int], true_labels: Sequence[int]):
155 return sklearn.metrics.v_measure_score(labels, true_labels)
158class AdjustedRandScore(RemovedCommonNoiseSupervisedMetric):
159 name = "AdjustedRandScore"
160 worstValue = -1
162 @staticmethod
163 def compute_value(labels: Sequence[int], true_labels: Sequence[int]):
164 return sklearn.metrics.adjusted_rand_score(labels, true_labels)
167class FowlkesMallowsScore(RemovedCommonNoiseSupervisedMetric):
168 name = "FowlkesMallowsScore"
170 @staticmethod
171 def compute_value(labels: Sequence[int], true_labels: Sequence[int]):
172 return sklearn.metrics.fowlkes_mallows_score(labels, true_labels)
175class AdjustedMutualInfoScore(RemovedCommonNoiseSupervisedMetric):
176 name = "AdjustedMutualInfoScore"
178 @staticmethod
179 def compute_value(labels: Sequence[int], true_labels: Sequence[int]):
180 return sklearn.metrics.adjusted_mutual_info_score(labels, true_labels)
183class ClusteringSupervisedEvalStats(ClusterLabelsEvalStats[ClusteringSupervisedMetric]):
184 """
185 Class containing methods to compute evaluation statistics a clustering result based on ground truth clusters
186 """
187 def __init__(self, labels: Sequence[int], true_labels: Sequence[int], noise_label=-1,
188 metrics: Sequence[ClusteringSupervisedMetric] = None,
189 additional_metrics: Sequence[ClusteringSupervisedMetric] = None):
190 """
191 :param labels: sequence of labels, usually the output of some clustering algorithm
192 :param true_labels: sequence of labels that represent the ground truth clusters
193 :param metrics: the metrics to compute. If None, will compute default metrics
194 :param additional_metrics: the metrics to additionally compute
195 """
196 if len(labels) != len(true_labels):
197 raise ValueError("true labels must be of same shape as labels")
198 self.trueLabels = np.array(true_labels)
199 self._labels_with_removed_common_noise = None
200 if metrics is None:
201 metrics = [VMeasureScore(), FowlkesMallowsScore(), AdjustedRandScore(), AdjustedMutualInfoScore()]
202 super().__init__(labels, noise_label, metrics, additional_metrics=additional_metrics)
204 @classmethod
205 def from_model(cls, clustering_model: EuclideanClusterer, true_labels: Sequence[int]):
206 return cls(clustering_model.labels, true_labels, noise_label=clustering_model.noiseLabel)
208 def labels_with_removed_common_noise(self) -> Tuple[np.ndarray, np.ndarray]:
209 """
210 :return: tuple (labels, true_labels) where points classified as noise in true and predicted data were removed
211 """
212 if self._labels_with_removed_common_noise is None:
213 if self.noiseLabel is None:
214 self._labels_with_removed_common_noise = self.labels, self.trueLabels
215 else:
216 common_noise_labels_mask = np.logical_and(self.noiseLabelsMask, self.trueLabels == self.noiseLabel)
217 kept_labels_mask = np.logical_not(common_noise_labels_mask)
218 self._labels_with_removed_common_noise = self.labels[kept_labels_mask], self.trueLabels[kept_labels_mask]
219 return self._labels_with_removed_common_noise