Source code for dials.algorithms.symmetry.cosym.seed_clustering

"""Seed clustering method for cosym analysis."""

import copy
import logging

logger = logging.getLogger(__name__)

import math

import numpy as np
import scipy.spatial.distance as ssd
from scipy.cluster import hierarchy
from sklearn import metrics
from sklearn.neighbors import NearestNeighbors

from libtbx import Auto
from libtbx.utils import Sorry


[docs]class seed_clustering: """Perform seed clustering of coordinates. Labels points into clusters such that cluster contains exactly one copy of each dataset, then performs silhouettete analysis on the resulting clusters to determine the true number of clusters present, under the constraint that only equal-sized clusterings are valid, i.e. each dataset should appear an equal number of times in each cluster. See also: https://en.wikipedia.org/wiki/Silhouette_(clustering) http://scikit-learn.org/stable/auto_examples/cluster/plot_kmeans_silhouette_analysis.html Attributes: cluster_labels (np.ndarray): A label for each coordinate. """
[docs] def __init__( self, coordinates, n_datasets, n_sym_ops, min_silhouette_score, n_clusters=Auto ): """Initialise a seed_clustering object. Args: coordinates (np.ndarray): The input array of coordinates on which to perform the analysis. The dimensions of the array should be (dim, `n_datasets` * `n_sym_ops`), where dim is the number of dimensions being used for the analysis. n_datasets (int): The number of datasets. n_sym_ops (int): The number of symmetry operations. min_silhouette_score (float): The minimum silhouette score to be used in automatic determination of the number of clusters. n_clusters (int): Optionally override the automatic determination of the number of clusters. """ self.coords = coordinates self.cluster_labels = self._label_clusters_first_pass(n_datasets, n_sym_ops) if self.cluster_labels.max() == 0: # assume single cluster return dist_mat, linkage_matrix = self._hierarchical_clustering() self.cluster_labels, _ = self._silhouette_analysis( self.cluster_labels, linkage_matrix, n_clusters=n_clusters, min_silhouette_score=min_silhouette_score, )
def _label_clusters_first_pass(self, n_datasets, n_sym_ops): """First pass labelling of clusters. Labels points into clusters such that cluster contains exactly one copy of each dataset. Args: n_datasets (int): The number of datasets. n_sym_ops (int): The number of symmetry operations. Returns: cluster_labels (np.ndarray): A label for each coordinate, labelled from 0 .. n_sym_ops. """ # initialise cluster labels: -1 signifies doesn't belong to a cluster cluster_labels = np.full(self.coords.shape[0], -1, dtype=int) cluster_id = 0 while (cluster_labels == -1).sum() > 0: coord_ids = np.arange(n_datasets * n_sym_ops) dataset_ids = coord_ids % n_datasets # select only those points that don't already belong to a cluster sel = np.where(cluster_labels == -1) X = self.coords[sel] dataset_ids = dataset_ids[sel] coord_ids = coord_ids[sel] # choose a high density point as seed for cluster nbrs = NearestNeighbors( n_neighbors=min(11, len(X)), algorithm="brute", metric="cosine" ).fit(X) distances, indices = nbrs.kneighbors(X) average_distance = np.array([dist[1:].mean() for dist in distances]) i = average_distance.argmin() d_id = dataset_ids[i] cluster = np.array([coord_ids[i]]) cluster_dataset_ids = np.array([d_id]) xis = np.array([X[i]]) for j in range(n_datasets - 1): # select only those rows that don't correspond to a dataset already # present in current cluster sel = np.where(dataset_ids != d_id) X = X[sel] dataset_ids = dataset_ids[sel] coord_ids = coord_ids[sel] assert len(X) > 0 # Find nearest neighbour in cosine-space to the current cluster centroid nbrs = NearestNeighbors( n_neighbors=min(1, len(X)), algorithm="brute", metric="cosine" ).fit(X) distances, indices = nbrs.kneighbors([xis.mean(axis=0)]) k = indices[0][0] d_id = dataset_ids[k] cluster = np.append(cluster, coord_ids[k]) cluster_dataset_ids = np.append(cluster_dataset_ids, d_id) xis = np.append(xis, [X[k]], axis=0) # label this cluster cluster_labels[cluster] = cluster_id cluster_id += 1 return cluster_labels def _hierarchical_clustering(self): """Perform hierarchical clustering on cluster centroids. Returns: Tuple[numpy.ndarray, numpy.ndarray]: A tuple containing the distance matrix as output by :func:`scipy.spatial.distance.pdist` and the linkage matrix as output by :func:`scipy.cluster.hierarchy.linkage`. """ cluster_centroids = [] for i in np.unique(self.cluster_labels): cluster_centroids.append(self.coords[self.cluster_labels == i].mean(axis=0)) # hierarchical clustering of cluster centroids, using cosine metric dist_mat = ssd.pdist(cluster_centroids, metric="cosine") return dist_mat, hierarchy.linkage(dist_mat, method="average") def _silhouette_analysis( self, cluster_labels, linkage_matrix, n_clusters, min_silhouette_score ): """Compare valid equal-sized clustering using silhouette scores. Args: cluster_labels (np.ndarray): linkage_matrix (np.ndarray): The hierarchical clustering of centroids of the initial clustering as produced by :func:`scipy.cluster.hierarchy.linkage`. n_clusters (int): Optionally override the automatic determination of the number of clusters. min_silhouette_score (float): The minimum silhouette score to be used in automatic determination of the number of clusters. Returns: cluster_labels (np.ndarray): A label for each coordinate. """ eps = 1e-6 cluster_labels_input = cluster_labels distances = linkage_matrix[::, 2] distances = np.insert(distances, 0, 0) silhouette_scores = [] thresholds = [] threshold_n_clusters = [] for threshold in distances[1:]: cluster_labels = copy.deepcopy(cluster_labels_input) labels = hierarchy.fcluster( linkage_matrix, threshold - eps, criterion="distance" ).tolist() counts = [labels.count(l) for l in set(labels)] if len(set(counts)) > 1: # only equal-sized clusters are valid continue n = len(set(labels)) if n == 1: continue elif n_clusters is not Auto and n != n_clusters: continue for i in range(len(labels)): cluster_labels[cluster_labels_input == i] = int(labels[i] - 1) if len(np.unique(cluster_labels)) == self.coords.shape[0]: # silhouette coefficient not defined if 1 dataset per cluster # not sure what the default value should be sample_silhouette_values = np.full(cluster_labels.size(), 0) else: # Compute the silhouette scores for each sample sample_silhouette_values = metrics.silhouette_samples( self.coords, cluster_labels, metric="cosine" ) silhouette_avg = sample_silhouette_values.mean() silhouette_scores.append(silhouette_avg) thresholds.append(threshold) threshold_n_clusters.append(n) count_negative = (sample_silhouette_values < 0).sum() logger.info("Clustering:") logger.info(" Number of clusters: %i", n) logger.info( " Threshold score: %.3f (%.1f deg)", threshold, math.degrees(math.acos(1 - threshold)), ) logger.info(" Silhouette score: %.3f", silhouette_avg) logger.info( " -ve silhouette scores: %.1f%%", 100 * count_negative / sample_silhouette_values.size, ) if n_clusters is Auto: idx = np.argmin(silhouette_scores) else: idx = threshold_n_clusters.index(n_clusters) if idx is None: raise Sorry("No valid clustering with %i clusters" % n_clusters) if n_clusters is Auto and silhouette_scores[idx] < min_silhouette_score: # assume single cluster cluster_labels = np.zeros(cluster_labels.size) else: threshold = thresholds[idx] - eps labels = hierarchy.fcluster(linkage_matrix, threshold, criterion="distance") cluster_labels = np.full(self.coords.shape[0], -1, dtype=int) for i in range(len(labels)): cluster_labels[cluster_labels_input == i] = labels[i] - 1 return cluster_labels, threshold