Coverage for src/sensai/clustering/clustering_base.py: 35%
110 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
1import logging
2from abc import ABC, abstractmethod
3from typing import Union, Set, Callable, Iterable, Optional
5import numpy as np
6import pandas as pd
7from scipy.spatial import distance_matrix
9from ..util.cache import PickleLoadSaveMixin
11log = logging.getLogger(__name__)
14# TODO at some point in the future: generalize to other input and deal with algorithms that allow prediction of labels
15class EuclideanClusterer(PickleLoadSaveMixin, ABC):
16 """
17 Base class for all clustering algorithms. Supports noise clusters and relabelling of identified clusters as noise
18 based on their size.
20 :param noise_label: label that is associated with the noise cluster or None
21 :param min_cluster_size: if not None, clusters below this size will be labeled as noise
22 :param max_cluster_size: if not None, clusters above this size will be labeled as noise
23 """
24 def __init__(self, noise_label=-1, min_cluster_size: int = None, max_cluster_size: int = None):
25 self._datapoints: Optional[np.ndarray] = None
26 self._labels: Optional[np.ndarray] = None
27 self._clusterIdentifiers: Optional[Set[int]] = None
28 self._nonNoiseClusterIdentifiers: Optional[Set[int]] = None
30 if min_cluster_size is not None or max_cluster_size is not None:
31 if noise_label is None:
32 raise ValueError("the noise label has to be not None for non-trivial bounds on cluster sizes")
33 self.noiseLabel = noise_label
34 self.maxClusterSize = max_cluster_size if max_cluster_size is not None else np.inf
35 self.minClusterSize = min_cluster_size if min_cluster_size is not None else -np.inf
37 self._clusterDict = {}
38 self._numClusters: Optional[int] = None
40 class Cluster:
41 def __init__(self, datapoints: np.ndarray, identifier: Union[int, str]):
42 self.datapoints = datapoints
43 self.identifier = identifier
44 self._radius: Optional[float] = None
45 self._centroid: Optional[np.ndarray] = None
47 def __len__(self):
48 return len(self.datapoints)
50 def __str__(self):
51 return f"{self.__class__.__name__}_{self.identifier}"
53 def _compute_radius(self):
54 return np.max(distance_matrix([self.centroid()], self.datapoints))
56 def _compute_centroid(self):
57 return np.mean(self.datapoints, axis=0)
59 def centroid(self):
60 if self._centroid is None:
61 self._centroid = self._compute_centroid()
62 return self._centroid
64 def radius(self):
65 if self._radius is None:
66 self._radius = self._compute_radius()
67 return self._radius
69 def summary_dict(self):
70 """
71 :return: dictionary containing coarse information about the cluster (e.g. num_members and centroid)
72 """
73 return {
74 "identifier": self.identifier,
75 "centroid": self.centroid(),
76 "numMembers": len(self),
77 "radius": self.radius()
78 }
80 @classmethod
81 def __str__(cls):
82 return cls.__name__
84 def clusters(self, condition: Callable[[Cluster], bool] = None) -> Iterable[Cluster]:
85 """
86 :param condition: if provided, only clusters fulfilling the condition will be included
87 :return: generator of clusters
88 """
89 percentage_to_log = 0
90 for i, clusterId in enumerate(self._nonNoiseClusterIdentifiers):
91 # logging process through the loop
92 percentage_generated = int(100 * i / self.num_clusters)
93 if percentage_generated == percentage_to_log:
94 log.info(f"Processed {percentage_to_log}% of clusters")
95 percentage_to_log += 5
97 cluster = self.get_cluster(clusterId)
98 if condition is None or condition(cluster):
99 yield cluster
101 def noise_cluster(self):
102 if self.noiseLabel is None:
103 raise NotImplementedError(f"The algorithm {self} does not provide a noise cluster")
104 return self.get_cluster(self.noiseLabel)
106 def summary_df(self, condition: Callable[[Cluster], bool] = None):
107 """
108 :param condition: if provided, only clusters fulfilling the condition will be included
109 :return: pandas DataFrame containing coarse information about the clusters
110 """
111 summary_dicts = [cluster.summary_dict() for cluster in self.clusters(condition=condition)]
112 return pd.DataFrame(summary_dicts).set_index("identifier", drop=True)
114 def fit(self, data: np.ndarray) -> None:
115 log.info(f"Fitting {self} to {len(data)} coordinate datapoints.")
116 labels = self._compute_labels(data)
117 if len(labels) != len(data):
118 raise Exception(f"Bad Implementation: number of labels does not match number of datapoints")
119 # Relabel clusters that do not fulfill size bounds as noise
120 if self.minClusterSize != -np.inf or self.maxClusterSize != np.inf:
121 for clusterId, clusterSize in zip(*np.unique(labels, return_counts=True)):
122 if not self.minClusterSize <= clusterSize <= self.maxClusterSize:
123 labels[labels == clusterId] = self.noiseLabel
125 self._datapoints = data
126 self._clusterIdentifiers = set(labels)
127 self._labels = labels
128 if self.noiseLabel is not None:
129 self._nonNoiseClusterIdentifiers = self._clusterIdentifiers.difference({self.noiseLabel})
130 log.info(f"{self} found {self.num_clusters} clusters")
132 @property
133 def is_fitted(self):
134 return self._datapoints is not None
136 @property
137 def datapoints(self) -> np.ndarray:
138 assert self.is_fitted
139 return self._datapoints
141 @property
142 def labels(self) -> np.ndarray:
143 assert self.is_fitted
144 return self._labels
146 @property
147 def cluster_identifiers(self) -> Set[int]:
148 assert self.is_fitted
149 return self._clusterIdentifiers
151 # unfortunately, there seems to be no way to annotate the return type correctly
152 # https://github.com/python/mypy/issues/3993
153 def get_cluster(self, cluster_id: int) -> Cluster:
154 if cluster_id not in self.labels:
155 raise KeyError(f"no cluster for id {cluster_id}")
156 result = self._clusterDict.get(cluster_id)
157 if result is None:
158 result = self.Cluster(self.datapoints[self.labels == cluster_id], identifier=cluster_id)
159 self._clusterDict[cluster_id] = result
160 return result
162 @property
163 def num_clusters(self) -> int:
164 return len(self._nonNoiseClusterIdentifiers)
166 @abstractmethod
167 def _compute_labels(self, x: np.ndarray) -> np.ndarray:
168 """
169 Fit the clustering model and return an array of integer cluster labels
171 :param x: the datapoints
172 :return: list of the same length as the input datapoints; it represents the mapping coordinate -> cluster_id
173 """
174 pass