#!/usr/bin/env python3
""" K-Means algorithm module. """
from dask_ml.cluster import KMeans as KMeans_MCPU
from sklearn.cluster import KMeans as KMeans_CPU
from dasf.ml.cluster.classifier import ClusterClassifier
from dasf.utils.decorators import task_handler
from dasf.utils.funcs import is_gpu_supported
try:
from cuml.cluster import KMeans as KMeans_GPU
from cuml.dask.cluster import KMeans as KMeans_MGPU
except ImportError:
pass
[docs]
class KMeans(ClusterClassifier):
"""
K-Means clustering.
Read more in the :ref:`User Guide <k_means>`.
Parameters
----------
n_clusters : int, default=8
The number of clusters to form as well as the number of
centroids to generate.
init : {'k-means++', 'random'}, callable or array-like of shape (n_clusters, n_features), default='k-means++'
Method for initialization:
'k-means++' : selects initial cluster centers for k-mean
clustering in a smart way to speed up convergence. See section
Notes in k_init for more details.
'random': choose `n_clusters` observations (rows) at random from data
for the initial centroids.
If an array is passed, it should be of shape (n_clusters, n_features)
and gives the initial centers.
If a callable is passed, it should take arguments X, n_clusters and a
random state and return an initialization.
n_init : int, default=10
Number of time the k-means algorithm will be run with different
centroid seeds. The final results will be the best output of
n_init consecutive runs in terms of inertia.
max_iter : int, default=300
Maximum number of iterations of the k-means algorithm for a
single run.
tol : float, default=1e-4
Relative tolerance with regards to Frobenius norm of the difference
in the cluster centers of two consecutive iterations to declare
convergence.
precompute_distances : {'auto', True, False}, default='auto'
Precompute distances (faster but takes more memory).
'auto' : do not precompute distances if n_samples * n_clusters > 12
million. This corresponds to about 100MB overhead per job using
double precision. IMPORTANT: This is used only in Dask ML version.
True : always precompute distances.
False : never precompute distances.
verbose : int, default=0
Verbosity mode.
random_state : int, RandomState instance or None, default=None
Determines random number generation for centroid initialization. Use
an int to make the randomness deterministic.
See :term:`Glossary <random_state>`.
copy_x : bool, default=True
When pre-computing distances it is more numerically accurate to center
the data first. If copy_x is True (default), then the original data is
not modified. If False, the original data is modified, and put back
before the function returns, but small numerical differences may be
introduced by subtracting and then adding the data mean. Note that if
the original data is not C-contiguous, a copy will be made even if
copy_x is False. If the original data is sparse, but not in CSR format,
a copy will be made even if copy_x is False.
n_jobs : int, default=1
The number of OpenMP threads to use for the computation. Parallelism is
sample-wise on the main cython loop which assigns each sample to its
closest center. IMPORTANT: This is used only in Dask ML version.
``None`` or ``-1`` means using all processors.
init_max_iter : int, default=None
Number of iterations for init step.
algorithm : {“lloyd”, “elkan”}, default=”lloyd”
K-means algorithm to use. The classical EM-style algorithm is "lloyd".
The "elkan" variation can be more efficient on some datasets with
well-defined clusters, by using the triangle inequality. However
it’s more memory intensive due to the allocation of an extra array of
shape (n_samples, n_clusters).
.. versionchanged:: 0.18
Added Elkan algorithm
oversampling_factor : int, default=2
The amount of points to sample in scalable k-means++ initialization
for potential centroids. Increasing this value can lead to better
initial centroids at the cost of memory. The total number of centroids
sampled in scalable k-means++ is oversampling_factor * n_clusters * 8.
max_samples_per_batch : int, default=32768
The number of data samples to use for batches of the pairwise distance
computation. This computation is done throughout both fit predict. The
default should suit most cases. The total number of elements in the
batched pairwise distance computation is max_samples_per_batch *
n_clusters. It might become necessary to lower this number when
n_clusters becomes prohibitively large.
output_type : {'input', 'cudf', 'cupy', 'numpy', 'numba'}, default=None
Variable to control output type of the results and attributes of the
estimator. If None, it'll inherit the output type set at the module
level, cuml.global_settings.output_type. See Output Data Type
Configuration for more info.
See Also
--------
MiniBatchKMeans : Alternative online implementation that does incremental
updates of the centers positions using mini-batches.
For large scale learning (say n_samples > 10k) MiniBatchKMeans is
probably much faster than the default batch implementation.
Notes
-----
The k-means problem is solved using either Lloyd's or Elkan's algorithm.
The average complexity is given by O(k n T), where n is the number of
samples and T is the number of iteration.
The worst case complexity is given by O(n^(k+2/p)) with
n = n_samples, p = n_features. (D. Arthur and S. Vassilvitskii,
'How slow is the k-means method?' SoCG2006)
In practice, the k-means algorithm is very fast (one of the fastest
clustering algorithms available), but it falls in local minima. That's why
it can be useful to restart it several times.
If the algorithm stops before fully converging (because of ``tol`` or
``max_iter``), ``labels_`` and ``cluster_centers_`` will not be consistent,
i.e. the ``cluster_centers_`` will not be the means of the points in each
cluster. Also, the estimator will reassign ``labels_`` after the last
iteration to make ``labels_`` consistent with ``predict`` on the training
set.
Examples
--------
>>> from dasf.ml.cluster import KMeans
>>> import numpy as np
>>> X = np.array([[1, 2], [1, 4], [1, 0],
... [10, 2], [10, 4], [10, 0]])
>>> kmeans = KMeans(n_clusters=2, random_state=0).fit(X)
>>> kmeans.predict([[0, 0], [12, 3]])
array([1, 0], dtype=int32)
For further informations see:
- https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html
- https://ml.dask.org/modules/generated/dask_ml.cluster.KMeans.html
- https://docs.rapids.ai/api/cuml/stable/api.html#k-means-clustering
- https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.cluster.KMeans
"""
def __init__(
self,
n_clusters=8,
init=None,
n_init=None,
max_iter=300,
tol=0.0001,
verbose=0,
random_state=None,
copy_x=True,
algorithm='lloyd',
oversampling_factor=2.0,
n_jobs=1,
init_max_iter=None,
max_samples_per_batch=32768,
precompute_distances='auto',
output_type=None,
**kwargs
):
""" Constructor of the class KMeans. """
super().__init__(**kwargs)
self.n_clusters = n_clusters
self.random_state = random_state
self.max_iter = max_iter
self.init = init
self.n_init = n_init
self.tol = tol
self.verbose = verbose
self.copy_x = copy_x
self.algorithm = algorithm
self.oversampling_factor = oversampling_factor
self.n_jobs = n_jobs
self.init_max_iter = init_max_iter
self.max_samples_per_batch = max_samples_per_batch
self.precompute_distances = precompute_distances
self.output_type = output_type
# Estimator for CPU operations
self.__kmeans_cpu = KMeans_CPU(
n_clusters=n_clusters,
random_state=random_state,
max_iter=max_iter,
init=("k-means++" if init is None else init),
n_init=(10 if n_init is None else n_init),
tol=tol,
verbose=verbose,
copy_x=copy_x,
algorithm=algorithm,
)
# Estimator for Dask ML operations
self.__kmeans_mcpu = KMeans_MCPU(
n_clusters=n_clusters,
random_state=random_state,
max_iter=max_iter,
init=("k-means||" if init is None else init),
tol=tol,
oversampling_factor=oversampling_factor,
algorithm=algorithm,
n_jobs=n_jobs,
init_max_iter=init_max_iter,
copy_x=copy_x,
precompute_distances=precompute_distances,
)
if is_gpu_supported():
# Estimator for CuML operations
self.__kmeans_gpu = KMeans_GPU(
n_clusters=n_clusters,
random_state=(1 if random_state is None else random_state),
max_iter=max_iter,
tol=tol,
verbose=verbose,
init=("scalable-k-means++" if init is None else init),
oversampling_factor=oversampling_factor,
max_samples_per_batch=max_samples_per_batch,
)
# XXX: KMeans in Multi GPU requires a Client instance,
# skip if not present.
try:
self.__kmeans_mgpu = KMeans_MGPU(
n_clusters=n_clusters,
random_state=(1 if random_state is None else random_state),
max_iter=max_iter,
tol=tol,
verbose=verbose,
init=("scalable-k-means++" if init is None else init),
oversampling_factor=oversampling_factor,
max_samples_per_batch=max_samples_per_batch,
)
except ValueError:
self.__kmeans_mgpu = None
[docs]
def _lazy_fit_cpu(self, X, y=None, sample_weight=None):
"""
Compute Dask k-means clustering.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training instances to cluster. It must be noted that the data
will be converted to C ordering, which will cause a memory
copy if the given data is not C-contiguous.
If a sparse matrix is passed, a copy will be made if it's not in
CSR format.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : Ignored
Not used, present here for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
return self.__kmeans_mcpu.fit(X=X, y=y)
[docs]
def _lazy_fit_gpu(self, X, y=None, sample_weight=None):
"""
Compute Dask CuML k-means clustering.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training instances to cluster. It must be noted that the data
will be converted to C ordering, which will cause a memory
copy if the given data is not C-contiguous.
If a sparse matrix is passed, a copy will be made if it's not in
CSR format.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
self : object
Fitted estimator.
"""
if self.__kmeans_mgpu is None:
raise NotImplementedError
return self.__kmeans_mgpu.fit(X=X, sample_weight=sample_weight)
[docs]
def _fit_cpu(self, X, y=None, sample_weight=None):
"""
Compute Scikit Learn k-means clustering.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training instances to cluster. It must be noted that the data
will be converted to C ordering, which will cause a memory
copy if the given data is not C-contiguous.
If a sparse matrix is passed, a copy will be made if it's not in
CSR format.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
self : object
Fitted estimator.
"""
return self.__kmeans_cpu.fit(X=X, y=y, sample_weight=sample_weight)
[docs]
def _fit_gpu(self, X, y=None, sample_weight=None):
"""
Compute CuML k-means clustering.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
Training instances to cluster. It must be noted that the data
will be converted to C ordering, which will cause a memory
copy if the given data is not C-contiguous.
If a sparse matrix is passed, a copy will be made if it's not in
CSR format.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
self : object
Fitted estimator.
"""
return self.__kmeans_gpu.fit(X=X, sample_weight=sample_weight)
[docs]
def _lazy_fit_predict_cpu(self, X, y=None, sample_weight=None):
"""
Compute cluster centers and predict cluster index for each sample using
Dask ML.
Convenience method; equivalent to calling fit(X) followed by
predict(X).
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to transform.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : Ignored
Not used, present here for API consistency by convention.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
local_kmeans = self.__kmeans_mcpu.fit(X=X, y=y)
return local_kmeans.predict(X=X)
[docs]
def _lazy_fit_predict_gpu(self, X, y=None, sample_weight=None):
"""
Compute cluster centers and predict cluster index for each sample using
Dask CuML.
Convenience method; equivalent to calling fit(X) followed by
predict(X).
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to transform.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
if self.__kmeans_mgpu is None:
raise NotImplementedError
return self.__kmeans_mgpu.fit_predict(X, y, sample_weight)
[docs]
def _fit_predict_cpu(self, X, y=None, sample_weight=None):
"""
Compute cluster centers and predict cluster index for each sample using
Scikit Learn.
Convenience method; equivalent to calling fit(X) followed by
predict(X).
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to transform.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : Ignored
Not used, present here for API consistency by convention.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
return self.__kmeans_cpu.fit_predict(X)
[docs]
def _fit_predict_gpu(self, X, y=None, sample_weight=None):
"""
Compute cluster centers and predict cluster index for each sample using
CuML.
Convenience method; equivalent to calling fit(X) followed by
predict(X).
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to transform.
y : Ignored
Not used, present here for API consistency by convention.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
return self.__kmeans_gpu.fit_predict(X=X, sample_weight=sample_weight)
[docs]
def _lazy_predict_cpu(self, X, sample_weight=None):
"""
Predict the closest cluster each sample in X belongs to using Dask ML.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to predict.
sample_weight : Ignored
Not used, present here for API consistency by convention.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
return self.__kmeans_mcpu.predict(X)
[docs]
def _lazy_predict_gpu(self, X, sample_weight=None):
"""
Predict the closest cluster each sample in X belongs to using Dask
CuML.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to predict.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
if self.__kmeans_mgpu is None:
raise NotImplementedError
return self.__kmeans_mgpu.predict(X, sample_weight)
[docs]
def _predict_cpu(self, X, sample_weight=None):
"""
Predict the closest cluster each sample in X belongs to using Scikit
Learn.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to predict.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
return self.__kmeans_cpu.predict(X)
[docs]
def _predict_gpu(self, X, sample_weight=None):
"""
Predict the closest cluster each sample in X belongs to using CuML.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to predict.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
return self.__kmeans_gpu.predict(X, sample_weight)
[docs]
def _lazy_predict2_cpu(self, X, sample_weight=None):
"""
A block predict using Scikit Learn variant but for Dask.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to predict.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
def __predict(block):
"""Block function to predict data per block."""
return self._predict_cpu.predict(block, sample_weight=sample_weight)
return X.map_blocks(
__predict, chunks=(X.chunks[0],), drop_axis=[1], dtype=X.dtype
)
[docs]
def _lazy_predict2_gpu(self, X, sample_weight=None):
"""
A block predict using CuML variant but for Dask.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to predict.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
def __predict(block):
"""Block function to predict data per block."""
return self._predict_gpu.predict(block, sample_weight=sample_weight)
return X.map_blocks(
__predict, chunks=(X.chunks[0],), drop_axis=[1], dtype=X.dtype
)
[docs]
def _predict2_cpu(self, X, sample_weight=None, compat=True):
"""
A block predict using Scikit Learn variant as a placeholder.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to predict.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
compat : bool
There is no version for single CPU/GPU for predict2. This
compatibility parameter uses the original predict method.
Otherwise, it raises an exception.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
if compat:
return self._predict_cpu.predict(X, sample_weight=sample_weight)
raise NotImplementedError("Method available only for Dask.")
[docs]
def _predict2_gpu(self, X, sample_weight=None, compat=True):
"""
A block predict using CuML variant as a placeholder.
In the vector quantization literature, `cluster_centers_` is called
the code book and each value returned by `predict` is the index of
the closest code in the code book.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
New data to predict.
sample_weight : array-like of shape (n_samples,), default=None
The weights for each observation in X. If None, all observations
are assigned equal weight.
compat : bool
There is no version for single CPU/GPU for predict2. This
compatibility parameter uses the original predict method.
Otherwise, it raises an exception.
Returns
-------
labels : ndarray of shape (n_samples,)
Index of the cluster each sample belongs to.
"""
if compat:
return self._predict_gpu.predict(X, sample_weight=sample_weight)
raise NotImplementedError("Method available only for Dask.")
[docs]
@task_handler
def predict2(self, sample_weight=None):
"""
Generic predict2 funtion according executor (for some ML methods only).
"""
...