from sklearn.cluster import KMeans
from sklearn import metrics
import numpy as np
import scipy
from scipy.spatial.distance import pdist, squareform, euclidean, \
correlation, cosine
from sklearn.preprocessing import MinMaxScaler
import logging
logger = logging.getLogger(__name__)
[docs]
def get_k(labels):
return len(np.unique(labels))
[docs]
def get_centers(X, labels):
return np.array([X[labels == l].mean(0) for l in np.unique(labels)])
[docs]
def ch_criterion(X, labels, distance=euclidean):
k = get_k(labels)
n = X.shape[0]
b = bgss(X, labels, distance)
w = wgss(X, labels, distance)
return (n - k)/(k - 1) * b / w
[docs]
def bgss(X, labels, distance=euclidean):
ds_mean = X.mean(0)
ss = 0
for i in np.unique(labels):
cluster_data = X[labels == i]
ss += (distance(cluster_data.mean(0), ds_mean) ** 2) * cluster_data.shape[0]
return ss
[docs]
def wgss(X, labels, distance=euclidean):
ss = 0
for i in np.unique(labels):
cluster_data = X[labels == i]
cluster_mean = cluster_data.mean(0)
css = 0
for x in cluster_data:
css += distance(x, cluster_mean) ** 2
ss += css
return ss
[docs]
def kl_criterion(X, labels, previous_labels=None, next_labels=None, precomputed=True):
n_cluster = get_k(labels)
"""
if n_cluster <= 1 or previous_labels==None or next_labels==None:
return 0
"""
n_prev_clusters = len(np.unique(previous_labels))
n_next_clusters = len(np.unique(next_labels))
if n_cluster != n_next_clusters-1 or n_prev_clusters+1 != n_cluster:
return 0
M_previous = m(X, previous_labels, precomputed=precomputed)
M_next = m(X, next_labels, precomputed=precomputed)
M_current = m(X, labels, precomputed=precomputed)
return 1 - 2 * M_current/M_previous + M_next/M_previous
[docs]
def W(X, labels, precomputed=True):
#distance = squareform(pdist(X, 'euclidean'))
import itertools
w_ = 0
for k in np.unique(labels):
cluster_ = labels == k
if precomputed == True:
index_cluster = np.nonzero(cluster_)
combinations_ = itertools.combinations(index_cluster[0], 2)
nrow = cluster_.shape[0]
array_indices = [get_triu_array_index(n[0], n[1], nrow) for n in combinations_]
cluster_dispersion = X[array_indices].sum()
else:
X_k = X[cluster_]
cluster_distance = squareform(pdist(X_k, 'euclidean'))
#cluster_distance = distance[cluster_,:][:,cluster_]
upper_index = np.triu_indices(cluster_distance.shape[0], k=1)
cluster_dispersion = cluster_distance[upper_index].sum()
w_ += (cluster_dispersion ** 2) * 0.5 * 1./cluster_.shape[0]
return w_
[docs]
def m(X, labels, precomputed=True):
n_cluster = get_k(labels)
return W(X, labels, precomputed=precomputed) * np.power(n_cluster, 2./X.shape[1])
[docs]
def gap(X, labels, nrefs=20, refs=None):
"""
Compute the Gap statistic for an nxm dataset in X.
Either give a precomputed set of reference distributions in refs as an (n,m,k) scipy array,
or state the number k of reference distributions in nrefs for automatic generation with a
uniformed distribution within the bounding box of X.
Give the list of k-values for which you want to compute the statistic in ks.
"""
shape = X.shape
if refs==None:
tops = X.max(axis=0)
bots = X.min(axis=0)
dists = scipy.matrix(scipy.diag(tops-bots))
rands = scipy.random.random_sample(size=(shape[0],shape[1],nrefs))
for i in range(nrefs):
rands[:,:,i] = rands[:,:,i]*dists+bots
else:
rands = refs
k = get_k(labels)
kml = labels
kmc = np.array([X[labels == l].mean(0) for l in np.unique(labels)])
disp = sum([euclidean(X[m,:], kmc[kml[m],:]) for m in range(shape[0])])
refdisps = scipy.zeros((rands.shape[2],))
for j in range(rands.shape[2]):
km = KMeans(n_clusters=k).transform(rands[:,:,j])
kml = km.labels_
kmc = km.cluster_centers_
refdisps[j] = sum([euclidean(rands[m,:,j], kmc[kml[m],:]) for m in range(shape[0])])
gaps = scipy.log(scipy.mean(refdisps))-scipy.log(disp)
return gaps
[docs]
def explained_variance(X, labels):
explained_variance_ = 0
k_ = get_k(labels)
great_avg = X.mean()
for i in np.unique(labels):
cluster_mask = labels == i
group_avg = X[cluster_mask].mean()
n_group = np.count_nonzero(cluster_mask)
group_var = n_group * np.power((group_avg - great_avg), 2) / (k_ - 1)
explained_variance_ += group_var
return explained_variance_
[docs]
def global_explained_variance(X, labels):
# Get the centroids for each cluster
centroids = np.array([X[labels == l].mean(0) for l in np.unique(labels)])
# Compute the global power
global_conn_power = X.std(axis=1)
denominator_ = np.sum(global_conn_power**2)
numerator_ = 0
for i, conn_pwr in enumerate(global_conn_power):
k_map = centroids[labels[i]]
if k_map.shape[0] == 2:
corr_ = 1 - cosine(X[i], k_map)
else:
corr_ = scipy.stats.pearsonr(X[i], k_map)[0]
numerator_ += np.power((conn_pwr * corr_), 2)
return numerator_/denominator_
[docs]
def cross_validation_index(X, labels):
n_maps = get_k(labels)
n_points = X.shape[0]
centroids = np.array([X[labels == l].mean(0) for l in np.unique(labels)])
sum_ = 0
for i, u in enumerate(X):
sum_ += euclidean(u, u)**2 - np.dot(centroids[labels[i]], u) ** 2
cv_criterion = sum_/(n_points**2 - 1) * ((n_points - 1)/(n_points - n_maps - 1))**2
return cv_criterion
[docs]
def get_triu_array_index(i, j, n_row):
return (n_row*i+j)-np.sum([(s+1) for s in range(i+1)])
[docs]
def index_i(X, labels):
k = get_k(labels)
if k == 2:
return 0
centroids = get_centers(X, labels)
center = X.mean(0)
ek = 0.
e1 = 0.
for i, x in enumerate(X):
ek += euclidean(x, centroids[labels[i]])
e1 += euclidean(x, center)
pair_dist = pdist(np.vstack(centroids), 'euclidean')
dk = pair_dist.max()
mk = pair_dist.min()
i_ = (1./k * float(e1)/ek * dk * mk)**2
return i_
metrics = {'silhouette': metrics.silhouette_score,
'kl': kl_criterion,
'wgss': wgss,
'gev': global_explained_variance,
'ev': explained_variance,
'ch': ch_criterion
}