Coverage for src/sensai/geoanalytics/geopandas/coordinate_clustering.py: 46%

70 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1import logging 

2from typing import Callable, Union, Iterable 

3 

4import geopandas as gp 

5import numpy as np 

6import pandas as pd 

7from shapely.geometry import MultiPoint 

8 

9from .coordinates import validate_coordinates, extract_coordinates_array, TCoordinates, GeoDataFrameWrapper 

10from ...clustering import SkLearnEuclideanClusterer 

11from ...clustering.clustering_base import EuclideanClusterer 

12from ...clustering.sklearn_clustering import SkLearnClustererProtocol 

13from ...util.cache import LoadSaveInterface 

14from ...util.profiling import timed 

15 

16log = logging.getLogger(__name__) 

17 

18 

19class CoordinateEuclideanClusterer(EuclideanClusterer, GeoDataFrameWrapper): 

20 """ 

21 Wrapper around a clustering model. This class adds additional, geospatial-specific features to the provided 

22 clusterer 

23 

24 :param clusterer: an instance of ClusteringModel 

25 """ 

26 def __init__(self, clusterer: EuclideanClusterer): 

27 self.clusterer = clusterer 

28 super().__init__(noise_label=clusterer.noiseLabel, 

29 max_cluster_size=clusterer.maxClusterSize, min_cluster_size=clusterer.minClusterSize) 

30 

31 class Cluster(EuclideanClusterer.Cluster, GeoDataFrameWrapper, LoadSaveInterface): 

32 """ 

33 Wrapper around a coordinates array 

34 

35 :param coordinates: 

36 :param identifier: 

37 """ 

38 

39 def __init__(self, coordinates: np.ndarray, identifier: Union[str, int]): 

40 validate_coordinates(coordinates) 

41 super().__init__(coordinates, identifier) 

42 

43 def to_geodf(self, crs='epsg:3857'): 

44 """ 

45 Export the cluster as a GeoDataFrame of length 1 with the cluster as an instance of 

46 MultiPoint and the identifier as index. 

47 

48 :param crs: projection. By default pseudo-mercator 

49 :return: GeoDataFrame 

50 """ 

51 gdf = gp.GeoDataFrame({"geometry": [self.as_multipoint()]}, index=[self.identifier]) 

52 gdf.index.name = "identifier" 

53 gdf.crs = crs 

54 return gdf 

55 

56 def as_multipoint(self): 

57 """ 

58 :return: The cluster's coordinates as a MultiPoint object 

59 """ 

60 return MultiPoint(self.datapoints) 

61 

62 @classmethod 

63 def load(cls, path): 

64 """ 

65 Instantiate from a geopandas readable file containing a single row with an identifier and an instance 

66 of MultiPoint 

67 

68 :param path: 

69 :return: instance of CoordinateCluster 

70 """ 

71 log.info(f"Loading instance of {cls.__name__} from {path}") 

72 gdf = gp.read_file(path) 

73 if len(gdf) != 1: 

74 raise Exception(f"Expected {path} to contain a single row, instead got {len(gdf)}") 

75 identifier, multipoint = gdf.identifier.values[0], gdf.geometry.values[0] 

76 return cls(np.array([[p.x, p.y] for p in multipoint]), identifier) 

77 

78 def save(self, path, crs="EPSG:3857"): 

79 """ 

80 Saves the cluster's coordinates as shapefile 

81 

82 :param crs: 

83 :param path: 

84 :return: 

85 """ 

86 log.info(f"Saving instance of {self.__class__.__name__} as shapefile to {path}") 

87 self.to_geodf(crs).to_file(path, index=True) 

88 

89 def _compute_labels(self, x: np.ndarray) -> np.ndarray: 

90 validate_coordinates(x) 

91 return self.clusterer._compute_labels(x) 

92 

93 def fit(self, coordinates: TCoordinates): 

94 """ 

95 Fitting to coordinates from a numpy array, a MultiPoint object or a GeoDataFrame with one Point per row 

96 

97 :param coordinates: 

98 :return: 

99 """ 

100 coordinates = extract_coordinates_array(coordinates) 

101 super().fit(coordinates) 

102 

103 @timed 

104 def to_geodf(self, condition: Callable[[Cluster], bool] = None, crs='epsg:3857', 

105 include_noise=False) -> gp.GeoDataFrame: 

106 """ 

107 GeoDataFrame containing all clusters found by the model. 

108 It is a concatenation of GeoDataFrames of individual clusters 

109 

110 :param condition: if provided, only clusters fulfilling the condition will be included 

111 :param crs: projection. By default pseudo-mercator 

112 :param include_noise: 

113 :return: GeoDataFrame with all clusters indexed by their identifier 

114 """ 

115 geodf = gp.GeoDataFrame() 

116 geodf.crs = crs 

117 for cluster in self.clusters(condition): 

118 geodf = pd.concat((geodf, cluster.to_geodf(crs=crs))) 

119 if include_noise: 

120 geodf = pd.concat((geodf, self.noise_cluster().to_geodf(crs=crs))) 

121 return geodf 

122 

123 def plot(self, include_noise=False, condition=None, **kwargs): 

124 """ 

125 Plots the resulting clusters with random coloring 

126 

127 :param include_noise: Whether to include the noise cluster 

128 :param condition: If provided, only clusters fulfilling this condition will be included 

129 :param kwargs: passed to GeoDataFrame.plot 

130 :return: 

131 """ 

132 geodf = self.to_geodf(condition=condition, include_noise=include_noise) 

133 geodf["color"] = np.random.random(len(geodf)) 

134 if include_noise: 

135 geodf.loc[self.noiseLabel, "color"] = 0 

136 geodf.plot(column="color", **kwargs) 

137 

138 # the overriding of the following methods is only necessary for getting the type annotations right 

139 # if mypy ever permits annotating nested classes correctly, these methods can be removed 

140 def get_cluster(self, cluster_id: int) -> Cluster: 

141 return super().get_cluster(cluster_id) 

142 

143 def noise_cluster(self) -> Cluster: 

144 return super().noise_cluster() 

145 

146 def clusters(self, condition: Callable[[Cluster], bool] = None) -> Iterable[Cluster]: 

147 return super().clusters(condition=condition) 

148 

149 

150class SkLearnCoordinateClustering(CoordinateEuclideanClusterer): 

151 """ 

152 Wrapper around a sklearn clusterer. This class adds additional features like relabelling and convenient methods 

153 for handling geospatial data 

154 

155 :param clusterer: a clusterer object compatible the sklearn API 

156 :param noise_label: label that is associated with the noise cluster or None 

157 :param min_cluster_size: if not None, clusters below this size will be labeled as noise 

158 :param max_cluster_size: if not None, clusters above this size will be labeled as noise 

159 """ 

160 def __init__(self, clusterer: SkLearnClustererProtocol, noise_label=-1, 

161 min_cluster_size: int = None, max_cluster_size: int = None): 

162 clusterer = SkLearnEuclideanClusterer(clusterer, noise_label=noise_label, 

163 min_cluster_size=min_cluster_size, max_cluster_size=max_cluster_size) 

164 super().__init__(clusterer)