Coverage for src/sensai/geoanalytics/geo_clustering.py: 0%

163 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import collections 

2import itertools 

3import math 

4from abc import abstractmethod, ABC 

5from typing import List, Tuple, Iterator, Optional 

6 

7import numpy as np 

8import sklearn.cluster 

9 

10from .geo_coords import GeoCoord 

11from .local_coords import LocalCoordinateSystem 

12from ..clustering import GreedyAgglomerativeClustering 

13 

14 

15class GeoCoordClusterer(ABC): 

16 @abstractmethod 

17 def fit_geo_coords(self, geo_coords: List[GeoCoord]): 

18 """ 

19 :param geo_coords: the coordinates to be clustered 

20 """ 

21 pass 

22 

23 @abstractmethod 

24 def clusters_indices(self) -> Tuple[List[List[int]], List[int]]: 

25 """ 

26 :return: a tuple (clusters, outliers), where clusters is a list of point indices, one list for each 

27 cluster containing the indices of points within the cluster, and outliers is the list of indices of points not within 

28 clusters 

29 """ 

30 pass 

31 

32 

33class GreedyAgglomerativeGeoCoordClusterer(GeoCoordClusterer): 

34 def __init__(self, 

35 max_min_distance_for_merge_m: float, 

36 max_distance_m: float, 

37 min_cluster_size: int, 

38 lcs: LocalCoordinateSystem = None): 

39 """ 

40 :param max_min_distance_for_merge_m: the maximum distance, in metres, for the minimum distance between two existing clusters for a merge 

41 to be admissible 

42 :param max_distance_m: the maximum distance, in metres, between any two points for the points to be allowed to be in the same cluster 

43 :param min_cluster_size: the minimum number of points any valid cluster must ultimately contain; the points in any smaller clusters 

44 shall be considered as outliers 

45 :param lcs: the local coordinate system to use for clustering; if None, compute based on mean coordinates passed when fitting 

46 """ 

47 self.lcs = lcs 

48 self.min_cluster_size = min_cluster_size 

49 self.max_min_distance_for_merge = max_min_distance_for_merge_m 

50 self.max_distance_m = max_distance_m 

51 self.squared_max_min_distance_for_merge = max_min_distance_for_merge_m * max_min_distance_for_merge_m 

52 self.squared_max_distance = max_distance_m * max_distance_m 

53 self.local_points = None 

54 self.m_min_distance: Optional["GreedyAgglomerativeGeoCoordClusterer.Matrix"] = None 

55 self.m_max_squared_distance: Optional["GreedyAgglomerativeGeoCoordClusterer.Matrix"] = None 

56 self.clusters = None 

57 

58 class Matrix: 

59 UNSET_VALUE = np.inf 

60 

61 def __init__(self, dim: int): 

62 self.m = np.empty((dim, dim)) 

63 self.m.fill(np.inf) 

64 

65 def set(self, c1: int, c2: int, value: float): 

66 self.m[c1][c2] = value 

67 self.m[c2][c1] = value 

68 

69 def get(self, c1: int, c2: int) -> float: 

70 return self.m[c1][c2] 

71 

72 class LocalPoint: 

73 def __init__(self, xy: np.ndarray, idx: int): 

74 self.idx = idx 

75 self.xy = xy 

76 

77 class Cluster(GreedyAgglomerativeClustering.Cluster): 

78 def __init__(self, point: "GreedyAgglomerativeGeoCoordClusterer.LocalPoint", idx: int, 

79 clusterer: 'GreedyAgglomerativeGeoCoordClusterer'): 

80 self.idx = idx 

81 self.clusterer = clusterer 

82 self.points = [point] 

83 

84 def merge_cost(self, other: "GreedyAgglomerativeGeoCoordClusterer.Cluster"): 

85 cartesian_product = itertools.product(self.points, other.points) 

86 min_squared_distance = math.inf 

87 max_squared_distance = 0 

88 for p1, p2 in cartesian_product: 

89 diff = p1.xy - p2.xy 

90 squared_distance = np.dot(diff, diff) 

91 if squared_distance > self.clusterer.squared_max_distance: 

92 max_squared_distance = math.inf 

93 break 

94 else: 

95 min_squared_distance = min(squared_distance, min_squared_distance) 

96 

97 # fill cache: the max value takes precedence; if it is inf (no merge admissible), then the min value is also set to inf; 

98 # the min value valid only if the max value is finite 

99 self.clusterer.m_max_squared_distance.set(self.idx, other.idx, max_squared_distance) 

100 if np.isinf(max_squared_distance): 

101 self.clusterer.m_min_distance.set(self.idx, other.idx, np.inf) 

102 else: 

103 self.clusterer.m_min_distance.set(self.idx, other.idx, min_squared_distance) 

104 

105 if np.isinf(max_squared_distance): 

106 return math.inf 

107 if min_squared_distance <= self.clusterer.squared_max_min_distance_for_merge: 

108 return min_squared_distance 

109 return math.inf 

110 

111 def merge(self, other): 

112 self.points += other.points 

113 

114 def fit_geo_coords(self, geo_coords: List[GeoCoord]) -> None: 

115 self.m_min_distance = self.Matrix(len(geo_coords)) 

116 self.m_max_squared_distance = self.Matrix(len(geo_coords)) 

117 if self.lcs is None: 

118 mean_coord = GeoCoord.mean_coord(geo_coords) 

119 self.lcs = LocalCoordinateSystem(mean_coord.lat, mean_coord.lon) 

120 self.local_points = [self.LocalPoint(np.array(self.lcs.get_local_coords(p.lat, p.lon)), idx) for idx, p in enumerate(geo_coords)] 

121 clusters = [self.Cluster(lp, i, self) for i, lp in enumerate(self.local_points)] 

122 gac = GreedyAgglomerativeClustering(clusters, 

123 merge_candidate_determination_strategy=self.MergeCandidateDeterminationStrategy(self.max_distance_m, self)) 

124 clusters = gac.apply_clustering() 

125 self.clusters = clusters 

126 

127 def clusters_indices(self) -> Tuple[List[List[int]], List[int]]: 

128 outliers = [] 

129 clusters = [] 

130 for c in self.clusters: 

131 indices = [p.idx for p in c.points] 

132 if len(c.points) < self.min_cluster_size: 

133 outliers.extend(indices) 

134 else: 

135 clusters.append(indices) 

136 return clusters, outliers 

137 

138 class MergeCandidateDeterminationStrategy(GreedyAgglomerativeClustering.MergeCandidateDeterminationStrategy): 

139 def __init__(self, search_radius_m: float, parent: "GreedyAgglomerativeGeoCoordClusterer"): 

140 super().__init__() 

141 self.parent = parent 

142 self.searchRadiusM = search_radius_m 

143 

144 def set_clusterer(self, clusterer: GreedyAgglomerativeClustering): 

145 super().set_clusterer(clusterer) 

146 points = [] 

147 for wc in self.clusterer.wrapped_clusters: 

148 c: GreedyAgglomerativeGeoCoordClusterer.Cluster = wc.cluster 

149 for p in c.points: 

150 points.append(p.xy) 

151 assert len(points) == len(self.clusterer.wrapped_clusters) 

152 points = np.stack(points) 

153 self.kdtree = sklearn.neighbors.KDTree(points) 

154 

155 def iter_candidate_indices(self, wc: "GreedyAgglomerativeClustering.WrappedCluster", initial: bool, 

156 merged_cluster_indices: Tuple[int, int] = None) -> Iterator[int]: 

157 c: GreedyAgglomerativeGeoCoordClusterer.Cluster = wc.cluster 

158 if initial: 

159 local_point = c.points[0] # pick any point from wc, since we use maximum cluster extension as search radius 

160 indices = self.kdtree.query_radius(np.reshape(local_point.xy, (1, 2)), self.searchRadiusM)[0] 

161 candidate_set = set() 

162 for idx in indices: 

163 wc = self.clusterer.wrapped_clusters[idx] 

164 candidate_set.add(wc.get_cluster_association().idx) 

165 yield from sorted(candidate_set) 

166 else: 

167 # The new distance values (max/min) between wc and any cluster index otherIdx can be computed from the cached distance 

168 # values of the two clusters from which wc was created through a merge: 

169 # The max distance is the maximum of the squared distances of the original clusters (and if either is inf, then 

170 # a merge is definitely inadmissible, because one of the original clusters was already too far away). 

171 # The min distance is the minimum of the squred distances of the original clusters. 

172 c1, c2 = merged_cluster_indices 

173 max1 = self.parent.m_max_squared_distance.m[c1] 

174 max2 = self.parent.m_max_squared_distance.m[c2] 

175 max_combined = np.maximum(max1, max2) 

176 for otherIdx, maxSqDistance in enumerate(max_combined): 

177 min_sq_distance = np.inf 

178 if maxSqDistance <= self.parent.squared_max_distance: 

179 wc_other = self.clusterer.wrapped_clusters[otherIdx] 

180 if wc_other.is_merged(): 

181 continue 

182 min1 = self.parent.m_min_distance.get(c1, otherIdx) 

183 min2 = self.parent.m_min_distance.get(c2, otherIdx) 

184 min_sq_distance = min(min1, min2) 

185 if min_sq_distance <= self.parent.squared_max_min_distance_for_merge: 

186 yield GreedyAgglomerativeClustering.ClusterMerge(wc, wc_other, min_sq_distance) 

187 # update cache 

188 self.parent.m_max_squared_distance.set(wc.idx, otherIdx, maxSqDistance) 

189 self.parent.m_min_distance.set(wc.idx, otherIdx, min_sq_distance) 

190 

191 

192class SkLearnGeoCoordClusterer(GeoCoordClusterer): 

193 def __init__(self, clusterer, lcs: LocalCoordinateSystem = None): 

194 """ 

195 :param clusterer: a clusterer from sklearn.cluster 

196 :param lcs: the local coordinate system to use for Euclidian conversion; if None, determine from data (using mean coordinate as 

197 centre) 

198 """ 

199 self.lcs = lcs 

200 self.clusterer = clusterer 

201 self.local_points = None 

202 

203 def fit_geo_coords(self, geo_coords: List[GeoCoord]): 

204 if self.lcs is None: 

205 mean_coord = GeoCoord.mean_coord(geo_coords) 

206 self.lcs = LocalCoordinateSystem(mean_coord.lat, mean_coord.lon) 

207 self.local_points = [self.lcs.get_local_coords(p.lat, p.lon) for p in geo_coords] 

208 self.clusterer.fit(self.local_points) 

209 

210 def _clusters(self, mode): 

211 clusters = collections.defaultdict(list) 

212 outliers = [] 

213 for idxPoint, idxCluster in enumerate(self.clusterer.labels_): 

214 if mode == "localPoints": 

215 item = self.local_points[idxPoint] 

216 elif mode == "indices": 

217 item = idxPoint 

218 else: 

219 raise ValueError() 

220 if idxCluster >= 0: 

221 clusters[idxCluster].append(item) 

222 else: 

223 outliers.append(item) 

224 return list(clusters.values()), outliers 

225 

226 def clusters_local_points(self) -> Tuple[List[List[Tuple[float, float]]], List[Tuple[float, float]]]: 

227 """ 

228 :return: a tuple (clusters, outliers), where clusters is a dictionary mapping from cluster index to 

229 the list of local points within the cluster and outliers is a list of local points not within 

230 clusters 

231 """ 

232 return self._clusters("localPoints") 

233 

234 def clusters_indices(self) -> Tuple[List[List[int]], List[int]]: 

235 return self._clusters("indices") 

236 

237 

238class DBSCANGeoCoordClusterer(SkLearnGeoCoordClusterer): 

239 def __init__(self, eps, min_samples, lcs: LocalCoordinateSystem = None, **kwargs): 

240 """ 

241 :param eps: the maximum distance between two samples for one to be considered as in the neighbourhood of the other 

242 :param min_samples: the minimum number of samples that must be within a neighbourhood for a cluster to be formed 

243 :param lcs: the local coordinate system for conversion to a Euclidian space 

244 :param kwargs: additional arguments to pass to DBSCAN (see https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html) 

245 """ 

246 super().__init__(sklearn.cluster.DBSCAN(eps=eps, min_samples=min_samples, **kwargs), lcs)