Source code for sensai.clustering.greedy_clustering

# -*- coding: utf-8 -*-
import logging
import math
import queue
from abc import ABC, abstractmethod
from typing import List, Sequence, Iterator, Optional, Union, Tuple

log = logging.getLogger(__name__)


[docs]class GreedyAgglomerativeClustering(object): """ An implementation of greedy agglomerative clustering which avoids unnecessary recomputations of merge costs through the management of a priority queue of potential merges. Greedy agglomerative clustering works as follows. Starting with an initial set of clusters (where each cluster typically contains a single data point), the method successively merges the two clusters where the merge cost is lowest (greedy), until no further merges are admissible. The merge operation is a mutating operation, i.e. the initial clusters are modified. To apply the method, the Cluster class must be subclassed, so as to define what the cost of a merge in your application shall be and how two clusters can be merged. For example, if data points are points in a Cartesian coordinate system, then the merge cost can be defined as the minimum or maximum distance among all pairs of points in the two clusters, admissibility being determined by a threshold that must not be exceeded; the merge operation can simply concatenate lists of data points. """ log = log.getChild(__qualname__)
[docs] class Cluster(ABC): """ Base class for clusters that can be merged via GreedyAgglomerativeClustering """
[docs] @abstractmethod def merge_cost(self, other) -> float: """ Computes the cost of merging the given cluster with this cluster :return: the (non-negative) merge cost or math.inf if a merge is inadmissible""" pass
[docs] @abstractmethod def merge(self, other): """ Merges the given cluster into this cluster" :param other: the cluster that is to be merged into this cluster """ pass
def __init__(self, clusters: Sequence[Cluster], merge_candidate_determination_strategy: "GreedyAgglomerativeClustering.MergeCandidateDeterminationStrategy" = None): """ :param clusters: the initial clusters, which are to be agglomerated into larger clusters """ self.prioritised_merges = queue.PriorityQueue() self.wrapped_clusters = [] for idx, c in enumerate(clusters): self.wrapped_clusters.append(GreedyAgglomerativeClustering.WrappedCluster(c, idx, self)) # initialise merge candidate determination strategy if merge_candidate_determination_strategy is None: merge_candidate_determination_strategy = self.MergeCandidateDeterminationStrategyDefault() merge_candidate_determination_strategy.set_clusterer(self) self.mergeCandidateDeterminationStrategy = merge_candidate_determination_strategy
[docs] def apply_clustering(self) -> List[Cluster]: """ Applies greedy agglomerative clustering to the clusters given at construction, merging clusters until no further merges are admissible :return: the list of agglomerated clusters (subset of the original clusters, which may have had other clusters merged into them) """ # compute all possible merges, adding them to the priority queue self.log.debug("Computing initial merges") for idx, wc in enumerate(self.wrapped_clusters): self.log.debug("Computing potential merges for cluster index %d" % idx) wc.compute_merges(True) # greedily apply the least-cost merges steps = 0 while not self.prioritised_merges.empty(): self.log.debug("Clustering step %d" % (steps+1)) have_merge = False merge: Optional[GreedyAgglomerativeClustering.ClusterMerge] = None while not have_merge and not self.prioritised_merges.empty(): merge = self.prioritised_merges.get() if not merge.evaporated: have_merge = True if have_merge: merge.apply() steps += 1 result = filter(lambda wc: not wc.is_merged(), self.wrapped_clusters) result = list(map(lambda wc: wc.cluster, result)) return result
[docs] class WrappedCluster(object): """ Wrapper for clusters which stores additional data required for clustering (internal use only) """ def __init__(self, cluster: "GreedyAgglomerativeClustering.Cluster", idx, clusterer: "GreedyAgglomerativeClustering"): self.merged_into_cluster: Optional[GreedyAgglomerativeClustering.WrappedCluster] = None self.merges = [] self.cluster = cluster self.idx = idx self.clusterer = clusterer
[docs] def is_merged(self) -> bool: return self.merged_into_cluster is not None
[docs] def get_cluster_association(self) -> "GreedyAgglomerativeClustering.WrappedCluster": """ Gets the wrapped cluster that this cluster's points have ultimately been merged into (which may be the cluster itself) :return: the wrapped cluster this cluster's points are associated with """ if self.merged_into_cluster is None: return self else: return self.merged_into_cluster.get_cluster_association()
[docs] def remove_merges(self): for merge in self.merges: merge.evaporated = True self.merges = []
[docs] def compute_merges(self, initial: bool, merged_cluster_indices: Tuple[int, int] = None): # add new merges to queue wrapped_clusters = self.clusterer.wrapped_clusters for item in self.clusterer.mergeCandidateDeterminationStrategy.iter_candidate_indices(self, initial, merged_cluster_indices): merge: Optional[GreedyAgglomerativeClustering.ClusterMerge] = None if type(item) == int: other_idx = item if other_idx != self.idx: other = wrapped_clusters[other_idx] if not other.is_merged(): merge_cost = self.cluster.merge_cost(other.cluster) if not math.isinf(merge_cost): merge = GreedyAgglomerativeClustering.ClusterMerge(self, other, merge_cost) else: merge = item assert merge.c1.idx == self.idx if merge is not None: merge.c1.merges.append(merge) merge.c2.merges.append(merge) self.clusterer.prioritised_merges.put(merge)
def __str__(self): return "Cluster[idx=%d]" % self.idx
[docs] class ClusterMerge(object): """ Represents a potential merge """ log = log.getChild(__qualname__) def __init__(self, c1: "GreedyAgglomerativeClustering.WrappedCluster", c2: "GreedyAgglomerativeClustering.WrappedCluster", merge_cost): self.c1 = c1 self.c2 = c2 self.merge_cost = merge_cost self.evaporated = False
[docs] def apply(self): c1, c2 = self.c1, self.c2 self.log.debug("Merging %s into %s..." % (str(c1), str(c2))) c1.cluster.merge(c2.cluster) c2.merged_into_cluster = c1 c1.remove_merges() c2.remove_merges() self.log.debug("Computing new merge costs for %s..." % str(c1)) c1.compute_merges(False, merged_cluster_indices=(c1.idx, c2.idx))
def __lt__(self, other): return self.merge_cost < other.merge_cost
[docs] class MergeCandidateDeterminationStrategy(ABC): """ Determines the indices of clusters which should be evaluated with regard to their merge costs """ def __init__(self): self.clusterer: Optional["GreedyAgglomerativeClustering"] = None
[docs] def set_clusterer(self, clusterer: "GreedyAgglomerativeClustering"): """ Initialises the clusterer the strategy is applied to :param clusterer: the clusterer """ self.clusterer = clusterer
[docs] @abstractmethod def iter_candidate_indices(self, wc: "GreedyAgglomerativeClustering.WrappedCluster", initial: bool, merged_cluster_indices: Tuple[int, int] = None) -> Iterator[Union[int, "GreedyAgglomerativeClustering.ClusterMerge"]]: """ :param wc: the wrapped cluster: the cluster for which to determine the cluster indices that are to be considered for a potential merge :param initial: whether we are computing the initial candidates (at the start of the clustering algorithm) :param merged_cluster_indices: [for initial=False] the pair of cluster indices that were just joined to form the updated cluster wc :return: an iterator of cluster indices that should be evaluated as potential merge partners for wc (it may contain the index of wc, which will be ignored) or of corresponding ClusterMerge objects """ pass
[docs] class MergeCandidateDeterminationStrategyDefault(MergeCandidateDeterminationStrategy):
[docs] def iter_candidate_indices(self, wc: "GreedyAgglomerativeClustering.WrappedCluster", initial: bool, merged_cluster_indices: Tuple[int, int] = None) -> Iterator[Union[int, "GreedyAgglomerativeClustering.ClusterMerge"]]: n = len(self.clusterer.wrapped_clusters) if initial: yield from range(wc.idx + 1, n) else: yield from range(n)