# -*- coding: utf-8 -*-
import logging
import math
import queue
from abc import ABC, abstractmethod
from typing import List, Sequence, Iterator, Callable, Optional, Union, Tuple
log = logging.getLogger(__name__)
[docs]class GreedyAgglomerativeClustering(object):
"""
An implementation of greedy agglomerative clustering which avoids unnecessary
recomputations of merge costs through the management of a priority queue of
potential merges.
Greedy agglomerative clustering works as follows. Starting with an initial
set of clusters (where each cluster typically contains a single data point),
the method successively merges the two clusters where the merge cost is lowest (greedy),
until no further merges are admissible.
The merge operation is a mutating operation, i.e. the initial clusters are modified.
To apply the method, the Cluster class must be subclassed, so as to define
what the cost of a merge in your application shall be and how two clusters can be merged.
For example, if data points are points in a Cartesian coordinate system, then the merge cost
can be defined as the minimum or maximum distance among all pairs of points in the two clusters,
admissibility being determined by a threshold that must not be exceeded;
the merge operation can simply concatenate lists of data points.
"""
log = log.getChild(__qualname__)
[docs] class Cluster(ABC):
"""
Base class for clusters that can be merged via GreedyAgglomerativeClustering
"""
[docs] @abstractmethod
def merge_cost(self, other) -> float:
"""
Computes the cost of merging the given cluster with this cluster
:return: the (non-negative) merge cost or math.inf if a merge is inadmissible"""
pass
[docs] @abstractmethod
def merge(self, other):
"""
Merges the given cluster into this cluster"
:param other: the cluster that is to be merged into this cluster
"""
pass
def __init__(self, clusters: Sequence[Cluster],
merge_candidate_determination_strategy: "GreedyAgglomerativeClustering.MergeCandidateDeterminationStrategy" = None):
"""
:param clusters: the initial clusters, which are to be agglomerated into larger clusters
"""
self.prioritised_merges = queue.PriorityQueue()
self.wrapped_clusters = []
for idx, c in enumerate(clusters):
self.wrapped_clusters.append(GreedyAgglomerativeClustering.WrappedCluster(c, idx, self))
# initialise merge candidate determination strategy
if merge_candidate_determination_strategy is None:
merge_candidate_determination_strategy = self.MergeCandidateDeterminationStrategyDefault()
merge_candidate_determination_strategy.set_clusterer(self)
self.mergeCandidateDeterminationStrategy = merge_candidate_determination_strategy
[docs] def apply_clustering(self) -> List[Cluster]:
"""
Applies greedy agglomerative clustering to the clusters given at construction, merging
clusters until no further merges are admissible
:return: the list of agglomerated clusters (subset of the original clusters, which may have had other
clusters merged into them)
"""
# compute all possible merges, adding them to the priority queue
self.log.debug("Computing initial merges")
for idx, wc in enumerate(self.wrapped_clusters):
self.log.debug("Computing potential merges for cluster index %d" % idx)
wc.compute_merges(True)
# greedily apply the least-cost merges
steps = 0
while not self.prioritised_merges.empty():
self.log.debug("Clustering step %d" % (steps+1))
have_merge = False
while not have_merge and not self.prioritised_merges.empty():
merge = self.prioritised_merges.get()
if not merge.evaporated:
have_merge = True
if have_merge:
merge.apply()
steps += 1
result = filter(lambda wc: not wc.is_merged(), self.wrapped_clusters)
result = list(map(lambda wc: wc.cluster, result))
return result
[docs] class WrappedCluster(object):
"""
Wrapper for clusters which stores additional data required for clustering (internal use only)
"""
def __init__(self, cluster, idx, clusterer: "GreedyAgglomerativeClustering"):
self.merged_into_cluster: Optional[GreedyAgglomerativeClustering.WrappedCluster] = None
self.merges = []
self.cluster = cluster
self.idx = idx
self.clusterer = clusterer
[docs] def is_merged(self) -> bool:
return self.merged_into_cluster is not None
[docs] def get_cluster_association(self) -> "GreedyAgglomerativeClustering.WrappedCluster":
"""
Gets the wrapped cluster that this cluster's points have ultimately been merged into (which may be the cluster itself)
:return: the wrapped cluster this cluster's points are associated with
"""
if self.merged_into_cluster is None:
return self
else:
return self.merged_into_cluster.get_cluster_association()
[docs] def remove_merges(self):
for merge in self.merges:
merge.evaporated = True
self.merges = []
[docs] def compute_merges(self, initial: bool, merged_cluster_indices: Tuple[int, int] = None):
# add new merges to queue
wrapped_clusters = self.clusterer.wrapped_clusters
for item in self.clusterer.mergeCandidateDeterminationStrategy.iter_candidate_indices(self, initial, merged_cluster_indices):
merge: Optional[GreedyAgglomerativeClustering.ClusterMerge] = None
if type(item) == int:
other_idx = item
if other_idx != self.idx:
other = wrapped_clusters[other_idx]
if not other.is_merged():
merge_cost = self.cluster.merge_cost(other.cluster)
if not math.isinf(merge_cost):
merge = GreedyAgglomerativeClustering.ClusterMerge(self, other, merge_cost)
else:
merge = item
assert merge.c1.idx == self.idx
if merge is not None:
merge.c1.merges.append(merge)
merge.c2.merges.append(merge)
self.clusterer.prioritised_merges.put(merge)
def __str__(self):
return "Cluster[idx=%d]" % self.idx
[docs] class ClusterMerge(object):
"""
Represents a potential merge
"""
log = log.getChild(__qualname__)
def __init__(self, c1: "GreedyAgglomerativeClustering.WrappedCluster", c2: "GreedyAgglomerativeClustering.WrappedCluster",
merge_cost):
self.c1 = c1
self.c2 = c2
self.merge_cost = merge_cost
self.evaporated = False
[docs] def apply(self):
c1, c2 = self.c1, self.c2
self.log.debug("Merging %s into %s..." % (str(c1), str(c2)))
c1.cluster.merge(c2.cluster)
c2.merged_into_cluster = c1
c1.remove_merges()
c2.remove_merges()
self.log.debug("Computing new merge costs for %s..." % str(c1))
c1.compute_merges(False, merged_cluster_indices=(c1.idx, c2.idx))
def __lt__(self, other):
return self.merge_cost < other.merge_cost
[docs] class MergeCandidateDeterminationStrategy(ABC):
def __init__(self):
self.clusterer: Optional["GreedyAgglomerativeClustering"] = None
"""
Determines the indices of clusters which should be evaluated with regard to their merge costs
"""
[docs] def set_clusterer(self, clusterer: "GreedyAgglomerativeClustering"):
"""
Initialises the clusterer the strategy is applied to
:param clusterer: the clusterer
"""
self.clusterer = clusterer
[docs] @abstractmethod
def iter_candidate_indices(self, wc: "GreedyAgglomerativeClustering.WrappedCluster", initial: bool,
merged_cluster_indices: Tuple[int, int] = None) -> Iterator[Union[int, "GreedyAgglomerativeClustering.ClusterMerge"]]:
"""
:param wc: the wrapped cluster: the cluster for which to determine the cluster indices that are to be considered for
a potential merge
:param initial: whether we are computing the initial candidates (at the start of the clustering algorithm)
:param merged_cluster_indices: [for initial=False] the pair of cluster indices that were just joined to form the updated
cluster wc
:return: an iterator of cluster indices that should be evaluated as potential merge partners for wc (it may contain the
index of wc, which will be ignored)
"""
pass
[docs] class MergeCandidateDeterminationStrategyDefault(MergeCandidateDeterminationStrategy):
[docs] def iter_candidate_indices(self, wc: "GreedyAgglomerativeClustering.WrappedCluster", initial: bool,
merged_cluster_indices: Tuple[int, int] = None) -> Iterator[Union[int, "GreedyAgglomerativeClustering.ClusterMerge"]]:
n = len(self.clusterer.wrapped_clusters)
if initial:
return range(wc.idx + 1, n)
else:
return range(n)