Coverage for src/sensai/geoanalytics/geopandas/coordinate_clustering_ground_truth.py: 22%

58 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-13 22:17 +0000

1""" 

2This module contains utilities for retrieving and visualizing ground truth labels for evaluating clustering algorithms 

3""" 

4 

5import geopandas as gp 

6import logging 

7import numpy as np 

8from geopandas import GeoDataFrame 

9from shapely.geometry import Polygon, MultiPoint, MultiPolygon 

10from typing import Sequence, Union, Optional 

11 

12from .coordinates import extract_coordinates_array, TCoordinates, GeoDataFrameWrapper 

13 

14log = logging.getLogger(__name__) 

15 

16 

17class PolygonAnnotatedCoordinates(GeoDataFrameWrapper): 

18 """ 

19 Class for retrieving ground truth cluster labels from a set of coordinate points and polygons. 

20 From the provided 2-dim. coordinates only points within the ground truth region will be considered. 

21 """ 

22 

23 def __init__(self, coordinates: TCoordinates, ground_truth_polygons: Union[str, Sequence[Polygon], GeoDataFrame], 

24 noise_label: Optional[int] = -1): 

25 """ 

26 :param coordinates: coordinates of points. These points should be spread over an area larger or equal to 

27 the ground truth area 

28 :param ground_truth_polygons: sequence of polygons, GeoDataFrame or path to a shapefile containing such a sequence. 

29 The polygons represent the ground truth for clustering. 

30 *Important*: the first polygon in the sequence is assumed to be the region within 

31 which ground truth was provided and has to cover all remaining polygons. This also means that all non-noise 

32 clusters in that region should be covered by a polygon 

33 :param noise_label: label to associate with noise or None 

34 """ 

35 

36 # The constructor might seem bloated but it really mostly does input validation for the polygons 

37 coordinates = extract_coordinates_array(coordinates) 

38 if isinstance(ground_truth_polygons, str): 

39 polygons: Sequence[Polygon] = gp.read_file(ground_truth_polygons).geometry.values 

40 elif isinstance(ground_truth_polygons, GeoDataFrame): 

41 polygons: Sequence[Polygon] = ground_truth_polygons.geometry.values 

42 else: 

43 polygons = ground_truth_polygons 

44 self.regionPolygon = polygons[0] 

45 self.noiseLabel = noise_label 

46 self.clusterPolygons = MultiPolygon(polygons[1:]) 

47 self.noisePolygon = self.regionPolygon.difference(self.clusterPolygons) 

48 

49 self.regionMultipoint = MultiPoint(coordinates).intersection(self.regionPolygon) 

50 if self.regionMultipoint.is_empty: 

51 raise Exception(f"The ground truth region contains no datapoints. " 

52 f"This can happen if you have provided unsuitable coordinates") 

53 self.noiseMultipoint = self.regionMultipoint.intersection(self.noisePolygon) 

54 if self.noiseLabel is None and not self.noisePolygon.is_empty: 

55 raise Exception(f"No noise_label was provided but there is noise: {len(self.noiseMultipoint)} datapoints" 

56 f"in annotated area do not belong to any cluster polygon") 

57 self.clustersMultipoints = [] 

58 intermediate_polygon = Polygon() 

59 for i, clusterPolygon in enumerate(self.clusterPolygons, start=1): 

60 if not intermediate_polygon.intersection(clusterPolygon).is_empty: 

61 raise Exception(f"The polygons should be non-intersecting: polygon {i} intersects with previous polygons") 

62 intermediate_polygon = intermediate_polygon.union(clusterPolygon) 

63 cluster_multipoint = self.regionMultipoint.intersection(clusterPolygon) 

64 if cluster_multipoint.is_empty: 

65 raise Exception(f"The annotated cluster for polygon {i} is empty - check your data!") 

66 self.clustersMultipoints.append(cluster_multipoint) 

67 

68 def to_geodf(self, crs='epsg:3857', include_noise=True): 

69 """ 

70 :return: GeoDataFrame with clusters as MultiPoint instance indexed by the clusters' identifiers 

71 """ 

72 clusters = self.clustersMultipoints 

73 first_label = 0 

74 if self.noiseLabel is not None and include_noise: 

75 clusters = [self.noiseMultipoint] + clusters 

76 first_label = self.noiseLabel 

77 gdf = gp.GeoDataFrame({"geometry": clusters, 

78 "identifier": list(range(first_label, first_label + len(clusters), 1))}, crs=crs) 

79 gdf.set_index("identifier", drop=True, inplace=True) 

80 return gdf 

81 

82 def plot(self, include_noise=True, **kwargs): 

83 """ 

84 Plots the ground truth clusters 

85 

86 :param include_noise: 

87 :param kwargs: 

88 :return: 

89 """ 

90 gdf = self.to_geodf(include_noise=include_noise) 

91 gdf["color"] = np.random.random(len(gdf)) 

92 if include_noise and self.noiseLabel is not None: 

93 gdf.loc[self.noiseLabel, "color"] = 0 

94 gdf.plot(column="color", **kwargs) 

95 

96 def get_coordinates_labels(self): 

97 """ 

98 Extract cluster coordinates and labels as numpy arrays from the provided ground truth region and 

99 cluster polygons 

100 

101 :return: tuple of arrays of the type (coordinates, labels) 

102 """ 

103 coords, labels = [], [] 

104 for row in self.to_geodf(include_noise=True).itertuples(): 

105 cluster_multipoint, label = row.geometry, row.Index 

106 coords += [[p.x, p.y] for p in cluster_multipoint] 

107 labels += [label] * len(cluster_multipoint) 

108 return np.array(coords), np.array(labels)