Source code for sensai.evaluation.eval_stats.eval_stats_clustering

import numpy as np
import sklearn
from typing import List, Dict, Tuple

from .eval_stats_base import EvalStats, TMetric
from ..eval_stats import Metric, abstractmethod, Sequence, ABC
from ...clustering import EuclideanClusterer


[docs]class ClusterLabelsEvalStats(EvalStats[TMetric], ABC): NUM_CLUSTERS = "numClusters" AV_SIZE = "averageClusterSize" MEDIAN_SIZE = "medianClusterSize" STDDEV_SIZE = "clusterSizeStd" MIN_SIZE = "minClusterSize" MAX_SIZE = "maxClusterSize" NOISE_SIZE = "noiseClusterSize" def __init__(self, labels: Sequence[int], noise_label: int, default_metrics: List[TMetric], additional_metrics: List[TMetric] = None): self.labels = np.array(labels) self.noiseLabel = noise_label # splitting off noise cluster from other clusters, computing cluster size distribution self.clusterLabelsMask: np.ndarray = self.labels != noise_label self.noiseLabelsMask: np.ndarray = np.logical_not(self.clusterLabelsMask) self.clustersLabels = self.labels[self.clusterLabelsMask] self.clusterIdentifiers, self.clusterSizeDistribution = \ np.unique(self.labels[self.clusterLabelsMask], return_counts=True) self.noiseClusterSize = self.noiseLabelsMask.sum() # operations like max and min raise an exception for empty arrays, this counteracts this effect if len(self.clusterSizeDistribution) == 0: self.clusterSizeDistribution = np.zeros(1) super().__init__(default_metrics, additional_metrics=additional_metrics)
[docs] def get_distribution_summary(self) -> Dict[str, float]: result = { self.NUM_CLUSTERS: len(self.clusterIdentifiers), self.AV_SIZE: self.clusterSizeDistribution.mean(), self.STDDEV_SIZE: self.clusterSizeDistribution.std(), self.MAX_SIZE: int(np.max(self.clusterSizeDistribution)), self.MIN_SIZE: int(np.min(self.clusterSizeDistribution)), self.MEDIAN_SIZE: np.median(self.clusterSizeDistribution) } if self.noiseLabel is not None: result[self.NOISE_SIZE] = int(self.noiseClusterSize) return result
[docs] def metrics_dict(self) -> Dict[str, float]: metrics_dict = super().metrics_dict() metrics_dict.update(self.get_distribution_summary()) return metrics_dict
[docs]class ClusteringUnsupervisedMetric(Metric["ClusteringUnsupervisedEvalStats"], ABC): pass
[docs]class RemovedNoiseUnsupervisedMetric(ClusteringUnsupervisedMetric): worstValue = 0
[docs] def compute_value_for_eval_stats(self, eval_stats: "ClusteringUnsupervisedEvalStats") -> float: if len(eval_stats.clustersLabels) == 0: # all is noise return 0 return self.compute_value(eval_stats.clustersDatapoints, eval_stats.clustersLabels)
[docs] @staticmethod @abstractmethod def compute_value(datapoints: np.ndarray, labels: Sequence[int]): pass
[docs]class CalinskiHarabaszScore(RemovedNoiseUnsupervisedMetric): name = "CalinskiHarabaszScore"
[docs] @staticmethod def compute_value(datapoints: np.ndarray, labels: Sequence[int]): return sklearn.metrics.calinski_harabasz_score(datapoints, labels)
[docs]class DaviesBouldinScore(RemovedNoiseUnsupervisedMetric): name = "DaviesBouldinScore" # TODO: I think in some edge cases this score could be larger than one, one should look into that worstValue = 1
[docs] @staticmethod def compute_value(datapoints: np.ndarray, labels: Sequence[int]): return sklearn.metrics.davies_bouldin_score(datapoints, labels)
# Note: this takes a lot of time to compute for many datapoints
[docs]class SilhouetteScore(RemovedNoiseUnsupervisedMetric): name = "SilhouetteScore" worstValue = -1
[docs] @staticmethod def compute_value(datapoints: np.ndarray, labels: Sequence[int]): return sklearn.metrics.silhouette_score(datapoints, labels)
[docs]class ClusteringUnsupervisedEvalStats(ClusterLabelsEvalStats[ClusteringUnsupervisedMetric]): """ Class containing methods to compute evaluation statistics of a clustering result """ def __init__(self, datapoints: np.ndarray, labels: Sequence[int], noise_label=-1, metrics: Sequence[ClusteringUnsupervisedMetric] = None, additional_metrics: Sequence[ClusteringUnsupervisedMetric] = None): """ :param datapoints: datapoints that were clustered :param labels: sequence of labels, usually the output of some clustering algorithm :param metrics: the metrics to compute. If None, will compute default metrics :param additional_metrics: the metrics to additionally compute """ if not len(labels) == len(datapoints): raise ValueError("Length of labels does not match length of datapoints array") if metrics is None: # Silhouette score is not included by default because it takes long to compute metrics = [CalinskiHarabaszScore(), DaviesBouldinScore()] super().__init__(labels, noise_label, metrics, additional_metrics=additional_metrics) self.datapoints = datapoints self.clustersDatapoints = self.datapoints[self.clusterLabelsMask] self.noiseDatapoints = self.datapoints[self.noiseLabelsMask]
[docs] @classmethod def from_model(cls, clustering_model: EuclideanClusterer): return cls(clustering_model.datapoints, clustering_model.labels, noise_label=clustering_model.noiseLabel)
[docs]class ClusteringSupervisedMetric(Metric["ClusteringSupervisedEvalStats"], ABC): pass
[docs]class RemovedCommonNoiseSupervisedMetric(ClusteringSupervisedMetric, ABC): worstValue = 0
[docs] def compute_value_for_eval_stats(self, eval_stats: "ClusteringSupervisedEvalStats") -> float: labels, true_labels = eval_stats.labels_with_removed_common_noise() if len(labels) == 0: return self.worstValue return self.compute_value(labels, true_labels)
[docs] @staticmethod @abstractmethod def compute_value(labels: Sequence[int], true_labels: Sequence[int]): pass
[docs]class VMeasureScore(RemovedCommonNoiseSupervisedMetric): name = "VMeasureScore"
[docs] @staticmethod def compute_value(labels: Sequence[int], true_labels: Sequence[int]): return sklearn.metrics.v_measure_score(labels, true_labels)
[docs]class AdjustedRandScore(RemovedCommonNoiseSupervisedMetric): name = "AdjustedRandScore" worstValue = -1
[docs] @staticmethod def compute_value(labels: Sequence[int], true_labels: Sequence[int]): return sklearn.metrics.adjusted_rand_score(labels, true_labels)
[docs]class FowlkesMallowsScore(RemovedCommonNoiseSupervisedMetric): name = "FowlkesMallowsScore"
[docs] @staticmethod def compute_value(labels: Sequence[int], true_labels: Sequence[int]): return sklearn.metrics.fowlkes_mallows_score(labels, true_labels)
[docs]class AdjustedMutualInfoScore(RemovedCommonNoiseSupervisedMetric): name = "AdjustedMutualInfoScore"
[docs] @staticmethod def compute_value(labels: Sequence[int], true_labels: Sequence[int]): return sklearn.metrics.adjusted_mutual_info_score(labels, true_labels)
[docs]class ClusteringSupervisedEvalStats(ClusterLabelsEvalStats[ClusteringSupervisedMetric]): """ Class containing methods to compute evaluation statistics a clustering result based on ground truth clusters """ def __init__(self, labels: Sequence[int], true_labels: Sequence[int], noise_label=-1, metrics: Sequence[ClusteringSupervisedMetric] = None, additional_metrics: Sequence[ClusteringSupervisedMetric] = None): """ :param labels: sequence of labels, usually the output of some clustering algorithm :param true_labels: sequence of labels that represent the ground truth clusters :param metrics: the metrics to compute. If None, will compute default metrics :param additional_metrics: the metrics to additionally compute """ if len(labels) != len(true_labels): raise ValueError("true labels must be of same shape as labels") self.trueLabels = np.array(true_labels) self._labels_with_removed_common_noise = None if metrics is None: metrics = [VMeasureScore(), FowlkesMallowsScore(), AdjustedRandScore(), AdjustedMutualInfoScore()] super().__init__(labels, noise_label, metrics, additional_metrics=additional_metrics)
[docs] @classmethod def from_model(cls, clustering_model: EuclideanClusterer, true_labels: Sequence[int]): return cls(clustering_model.labels, true_labels, noise_label=clustering_model.noiseLabel)
[docs] def labels_with_removed_common_noise(self) -> Tuple[np.ndarray, np.ndarray]: """ :return: tuple (labels, true_labels) where points classified as noise in true and predicted data were removed """ if self._labels_with_removed_common_noise is None: if self.noiseLabel is None: self._labels_with_removed_common_noise = self.labels, self.trueLabels else: common_noise_labels_mask = np.logical_and(self.noiseLabelsMask, self.trueLabels == self.noiseLabel) kept_labels_mask = np.logical_not(common_noise_labels_mask) self._labels_with_removed_common_noise = self.labels[kept_labels_mask], self.trueLabels[kept_labels_mask] return self._labels_with_removed_common_noise