Coverage for src/sensai/geoanalytics/geopandas/coordinate_clustering_ground_truth.py: 22%
58 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-29 18:29 +0000
1"""
2This module contains utilities for retrieving and visualizing ground truth labels for evaluating clustering algorithms
3"""
5import geopandas as gp
6import logging
7import numpy as np
8from geopandas import GeoDataFrame
9from shapely.geometry import Polygon, MultiPoint, MultiPolygon
10from typing import Sequence, Union, Optional
12from .coordinates import extract_coordinates_array, TCoordinates, GeoDataFrameWrapper
14log = logging.getLogger(__name__)
17class PolygonAnnotatedCoordinates(GeoDataFrameWrapper):
18 """
19 Class for retrieving ground truth cluster labels from a set of coordinate points and polygons.
20 From the provided 2-dim. coordinates only points within the ground truth region will be considered.
21 """
23 def __init__(self, coordinates: TCoordinates, ground_truth_polygons: Union[str, Sequence[Polygon], GeoDataFrame],
24 noise_label: Optional[int] = -1):
25 """
26 :param coordinates: coordinates of points. These points should be spread over an area larger or equal to
27 the ground truth area
28 :param ground_truth_polygons: sequence of polygons, GeoDataFrame or path to a shapefile containing such a sequence.
29 The polygons represent the ground truth for clustering.
30 *Important*: the first polygon in the sequence is assumed to be the region within
31 which ground truth was provided and has to cover all remaining polygons. This also means that all non-noise
32 clusters in that region should be covered by a polygon
33 :param noise_label: label to associate with noise or None
34 """
36 # The constructor might seem bloated but it really mostly does input validation for the polygons
37 coordinates = extract_coordinates_array(coordinates)
38 if isinstance(ground_truth_polygons, str):
39 polygons: Sequence[Polygon] = gp.read_file(ground_truth_polygons).geometry.values
40 elif isinstance(ground_truth_polygons, GeoDataFrame):
41 polygons: Sequence[Polygon] = ground_truth_polygons.geometry.values
42 else:
43 polygons = ground_truth_polygons
44 self.regionPolygon = polygons[0]
45 self.noiseLabel = noise_label
46 self.clusterPolygons = MultiPolygon(polygons[1:])
47 self.noisePolygon = self.regionPolygon.difference(self.clusterPolygons)
49 self.regionMultipoint = MultiPoint(coordinates).intersection(self.regionPolygon)
50 if self.regionMultipoint.is_empty:
51 raise Exception(f"The ground truth region contains no datapoints. "
52 f"This can happen if you have provided unsuitable coordinates")
53 self.noiseMultipoint = self.regionMultipoint.intersection(self.noisePolygon)
54 if self.noiseLabel is None and not self.noisePolygon.is_empty:
55 raise Exception(f"No noise_label was provided but there is noise: {len(self.noiseMultipoint)} datapoints"
56 f"in annotated area do not belong to any cluster polygon")
57 self.clustersMultipoints = []
58 intermediate_polygon = Polygon()
59 for i, clusterPolygon in enumerate(self.clusterPolygons, start=1):
60 if not intermediate_polygon.intersection(clusterPolygon).is_empty:
61 raise Exception(f"The polygons should be non-intersecting: polygon {i} intersects with previous polygons")
62 intermediate_polygon = intermediate_polygon.union(clusterPolygon)
63 cluster_multipoint = self.regionMultipoint.intersection(clusterPolygon)
64 if cluster_multipoint.is_empty:
65 raise Exception(f"The annotated cluster for polygon {i} is empty - check your data!")
66 self.clustersMultipoints.append(cluster_multipoint)
68 def to_geodf(self, crs='epsg:3857', include_noise=True):
69 """
70 :return: GeoDataFrame with clusters as MultiPoint instance indexed by the clusters' identifiers
71 """
72 clusters = self.clustersMultipoints
73 first_label = 0
74 if self.noiseLabel is not None and include_noise:
75 clusters = [self.noiseMultipoint] + clusters
76 first_label = self.noiseLabel
77 gdf = gp.GeoDataFrame({"geometry": clusters,
78 "identifier": list(range(first_label, first_label + len(clusters), 1))}, crs=crs)
79 gdf.set_index("identifier", drop=True, inplace=True)
80 return gdf
82 def plot(self, include_noise=True, **kwargs):
83 """
84 Plots the ground truth clusters
86 :param include_noise:
87 :param kwargs:
88 :return:
89 """
90 gdf = self.to_geodf(include_noise=include_noise)
91 gdf["color"] = np.random.random(len(gdf))
92 if include_noise and self.noiseLabel is not None:
93 gdf.loc[self.noiseLabel, "color"] = 0
94 gdf.plot(column="color", **kwargs)
96 def get_coordinates_labels(self):
97 """
98 Extract cluster coordinates and labels as numpy arrays from the provided ground truth region and
99 cluster polygons
101 :return: tuple of arrays of the type (coordinates, labels)
102 """
103 coords, labels = [], []
104 for row in self.to_geodf(include_noise=True).itertuples():
105 cluster_multipoint, label = row.geometry, row.Index
106 coords += [[p.x, p.y] for p in cluster_multipoint]
107 labels += [label] * len(cluster_multipoint)
108 return np.array(coords), np.array(labels)