Source code for scludam.membership

# scludam, Star CLUster Detection And Membership estimation package
# Copyright (C) 2022  Simón Pedro González

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.


"""Module for Density Based Membership Estimation."""

from copy import deepcopy
from typing import List, Union

import numpy as np
from attrs import Factory, define, field, validators

from scludam.hkde import HKDE
from scludam.plots import (
    pairprobaplot,
    scatter3dprobaplot,
    surfprobaplot,
    tsneprobaplot,
)
from scludam.type_utils import (
    Numeric1DArray,
    Numeric2DArray,
    OptionalNumeric2DArray,
    OptionalNumericArray,
    _type,
)


[docs]@define class DBME: """Density Based Membership Estimation. It uses :class:`~scludam.hkde.HKDE` to estimate the density and calculate smooth membership probabilities for each class, given data and initial probabilities. Attributes ---------- n_iters : int Number of iterations, by default 2. In each iteration, prior probabilities are updated according to the posterior probabilities of the previous iteration. kernel_calculation_mode : str Mode of kernel calculation, by default ``per_class``. It indicates how many :class:`~scludam.hkde.HKDE` estimators will be used to estimate the density. Available modes are: * ``same``: the bandwidth of the kernels is the same for all classes. There will be one estimator. * ``per_class``: the bandwidth of the kernels is different for each class. There will be one estimator per class. * ``per_class_per_iter``: the bandwidth of the kernels is different for each class and iteration. There will be one estimator per class which will be updated in each iteration, recalculating the bandwith each time. kde_leave1out : bool Whether to use leave-one-out KDE estimation, by default True. pdf_estimator : Union[:class:`~scludam.hkde.HKDE`, List[HKDE]] Estimator used to estimate the density, by default an instance of HKDE with default parameters. If list is provided, it is asumed either 1 per class or first for first class and 2nd for the rest. n_classes: int Number of detected classes. Only available after the :func:`~scludam.membership.DBME.fit` method is called. labels : Numeric1DArray Labels of the classes, only available after the :func:`~scludam.membership.DBME.fit` method is called. counts : Numeric1DArray Number of data points in each class, only available after the :func:`~scludam.membership.DBME.fit` method is called. priors : Numeric1DArray Prior probabilities of each class, only available after the :func:`~scludam.membership.DBME.fit` method is called. posteriors : Numeric2DArray Posterior probabilities array of shape (n_datapoints, n_classes), only available after the :func:`~scludam.membership.DBME.fit` method is called. Examples -------- .. literalinclude:: ../../examples/membership/dbme.py :language: python :linenos: .. image:: ../../examples/membership/init_proba.png .. image:: ../../examples/membership/dbme.png """ # intput attrs n_iters: int = field(default=2, validator=[_type(int), validators.gt(0)]) kde_leave1out: bool = field(default=True, validator=_type(bool)) kernel_calculation_mode: str = field( validator=validators.in_(["same", "per_class", "per_class_per_iter"]), default="per_class", ) pdf_estimator: HKDE = field( default=HKDE(), validator=_type(Union[HKDE, List[HKDE]]) ) # internal attrs _n: int = None _d: int = None _unique_labels: Numeric1DArray = None _data: Numeric2DArray = None _estimators: list = Factory(list) _n_estimators: int = None _iter_priors: list = Factory(list) _iter_counts: list = Factory(list) _iter_label_diff: list = Factory(list) _iter_labels: list = Factory(list) # output attrs n_classes: int = None labels: Numeric1DArray = None counts: Numeric1DArray = None posteriors: Numeric2DArray = None priors: Numeric1DArray = None def _update_class_mixtures(self, posteriors: Numeric2DArray): self.labels = np.argmax(posteriors, axis=1) - 1 self._iter_labels.append(self.labels) if len(self._iter_labels) > 1: label_diff = ( (self._iter_labels[-1] != self._iter_labels[-2]).astype(int).sum() ) self._iter_label_diff.append(label_diff) self.counts = posteriors.sum(axis=0) self._iter_counts.append(self.counts) self.priors = self.counts / self._n self._iter_priors.append(self.priors) def _get_posteriors(self, densities): return self._get_posteriors1(densities) def _get_posteriors1(self, densities: Numeric2DArray): # probability calculation # P(Ci|x) = Di(x) * P(Ci) / sumj(Dj(x) * P(Cj)) total_density = ( (densities * self.counts) .sum(axis=1, keepdims=True) .repeat(self.n_classes, axis=1) ) posteriors = densities * self.counts / total_density # it could be caused when all densities are 0 posteriors[np.isnan(posteriors)] = 0 return posteriors def _get_posteriors2(self, densities): # probability calculation # P(Ci|x) = Di(x) / sumj(Dj(x)) total_density = densities.sum(axis=1, keepdims=True).repeat( self.n_classes, axis=1 ) posteriors = densities / total_density return posteriors # def _get_posteriors3(self, densities): # # probability calculation # # P(Ci|x) = Di(x) / sumj(Dj(x)) # total_density = densities.sum(axis=1, keepdims=True).repeat( # self.n_classes, axis=1 # ) # posteriors = 10**(np.log(densities) - np.log(total_density)) # return posteriors # def get_posteriors3(self, densities): # # probability calculation # # P(Ci|x) = Di(x) / sumj(Dj(x)) # total_density = densities.sum(axis=1, keepdims=True).repeat( # self.n_classes, axis=1 # ) # posteriors = densities / total_density # yfie1 = HKDE().fit(self.data, weights=posteriors[:, 0]).pdf(self.data) # yclu = 1 - yfie1 # new_posteriors = np.concatenate( # (yfie1.reshape(-1, 1), yclu.reshape(-1, 1)), axis=1 # ) # return new_posteriors def _fit_several_estimators( self, data: Numeric2DArray, err: OptionalNumeric2DArray, corr: OptionalNumericArray, weights: OptionalNumeric2DArray, ): # estimator(s) fitting first_iter = not self._estimators each_time = self.kernel_calculation_mode == "per_class_per_iter" if first_iter or each_time: self._estimators = [] if self._n_estimators == 2: # one for first class and copy the 2nd for the other classes # no need to copy the first one self._estimators.append( self.pdf_estimator[0].fit( data=data, err=err, corr=corr, weights=weights[:, 0], ) ) for i in range(1, self.n_classes): self._estimators.append( deepcopy(self.pdf_estimator[1]).fit( data=data, err=err, corr=corr, weights=weights[:, i], ), ) else: # assume n_estimators == n_classes for i in range(self.n_classes): self._estimators.append( self.pdf_estimator[i].fit( data=data, err=err, corr=corr, weights=weights[:, i], ), ) def _fit_one_estimator( self, data: Numeric2DArray, err: OptionalNumeric2DArray, corr: OptionalNumericArray, weights: OptionalNumeric2DArray, ): # estimator(s) fitting first_iter = not self._estimators each_time = self.kernel_calculation_mode == "per_class_per_iter" if first_iter or each_time: if self.kernel_calculation_mode == "same": self._estimators = [self.pdf_estimator.fit(data, err, corr)] else: self._estimators = [] for i in range(self.n_classes): self._estimators.append( deepcopy(self.pdf_estimator).fit( data=data, err=err, corr=corr, weights=weights[:, i], ), ) def _get_densities( self, data: Numeric2DArray, err: OptionalNumeric2DArray, corr: OptionalNumericArray, weights: OptionalNumeric2DArray, ): densities = np.zeros((self._n, self.n_classes)) if self._n_estimators == 1: self._fit_one_estimator(data, err, corr, weights) else: self._fit_several_estimators(data, err, corr, weights) # pdf estimation for i in range(self.n_classes): if self.kernel_calculation_mode == "same": class_estimator = self._estimators[0] else: class_estimator = self._estimators[i] densities[:, i] = class_estimator.set_weights(weights[:, i]).pdf( data, leave1out=self.kde_leave1out ) return densities def _validate_n_estimators(self): # if self.pdf_estimator is class HKDE, set _n_estimators to 1 if isinstance(self.pdf_estimator, HKDE): self._n_estimators = 1 return else: # is list of hkde self._n_estimators = len(self.pdf_estimator) if self._n_estimators == 1: # normal case, return self.pdf_estimator = self.pdf_estimator[0] return # if len > 1 then kernel_calculation_mode should be # either per_class or per_class_per_iter else: if self.kernel_calculation_mode not in [ "per_class", "per_class_per_iter", ]: raise ValueError("kernel_calculation_mode and n_estimators mismatch") # now check against n_classes, n_estimators can be only # either == n_classes, or 2 (field, clusters) # we assume n_classes = 1 is not possible because # in fit we already returned init_proba in that case if self._n_estimators != self.n_classes and self._n_estimators != 2: raise ValueError("n_estimators should be 1, 2 or n_classes") return
[docs] def fit( self, data: Numeric2DArray, init_proba: Numeric2DArray, err: OptionalNumeric2DArray = None, corr: OptionalNumericArray = None, ): """Fit models and calculate posteriors probabilities. The method takes data and initial probabilities and creates density estimators. Prior probabilities are taken from the initial probabilities. In each iteration, the method calculates the posterior probabilities of each datapoint using the density estimates and prior probabilites. Also, the method updates the prior probabilities considering the posterior probabilities of the past iteration. ``n_iters=1`` uses prior probabilities as provided in the initial probabilities array. ``n_iters=2`` (recommended), updates the prior probabilities once. Parameters ---------- data : Numeric2DArray Data matrix. init_proba : Numeric2DArray Initial posterior probability array. Must be of shape (n_samples, n_classes). This probabilities are used to create the initial density estimators per class. err : OptionalNumeric2DArray, optional Error parameter to be passed to :func:`~scludam.hkde.HKDE.fit`, by default None. corr : OptionalNumericArray, optional Correlation parameter to be passed to :func:`~scludam.hkde.HKDE.fit`, by default None. Returns ------- DBME Fitted instance of the :class:`~scludam.membership.DBME` class. """ self._n, self._d = data.shape self._data = data self.labels = np.argmax(init_proba, axis=1) - 1 self._unique_labels = np.unique(self.labels) self.n_classes = len(self._unique_labels) self.posteriors = init_proba self._update_class_mixtures(posteriors=init_proba) # case no clusters found if self.n_classes == 1: # there are no populations to fit return self self._validate_n_estimators() for i in range(self.n_iters): # is copy actually needed? previous_posteriors = self.posteriors.copy() weights = previous_posteriors densities = self._get_densities(data, err, corr, weights) self.posteriors = self._get_posteriors(densities) # if np.any(self.posteriors) < 0: # print('stop') # if np.any(self.posteriors) > 1: # print('stop') self._update_class_mixtures(self.posteriors) return self
[docs] def pairplot(self, **kwargs): """Plot the clustering results in a pairplot. It uses the :func:`~scludam.plots.pairprobaplot`. The colors of the points represent class labels. The sizes of the points reresent the probability of belonging to the most probable class. Returns ------- seaborn.PairGrid Pairplot of the clustering results. Raises ------ Exception If the clustering has not been performed yet. """ if not self._is_fitted(): raise Exception("Clusterer not fitted. Try excecuting fit function first.") return pairprobaplot( data=self._data, proba=self.posteriors, labels=self.labels, **kwargs )
[docs] def tsneplot(self, **kwargs): """Plot the clustering results in a t-SNE plot. It uses the :func:`~scludam.plots.tsneprobaplot` function. It represents the data in a 2 dimensional space using t-SNE. The colors of the points represent class labels. The sizes of the points represent the probability of belonging to the most probable class. Returns ------- matplotlib.axes.Axes Plot of the clustering results. Raises ------ Exception If the clustering has not been performed yet. """ if not self._is_fitted(): raise Exception("Clusterer not fitted. Try excecuting fit function first.") return tsneprobaplot( data=self._data, proba=self.posteriors, labels=self.labels, **kwargs )
def _is_fitted(self): return ( self.labels is not None and self.posteriors is not None and self._data is not None and self.priors is not None )
[docs] def scatter3dplot(self, **kwargs): """Plot the clustering results in a 3D scatter plot. It uses the :func:`~scludam.plots.scatter3dprobaplot` function. It represents the data in a 3 dimensional space using the variables given by the user. The colors of the points represent class labels. The sizes of the points represent the probability of belonging to the most probable class. Returns ------- matplotlib.collections.PathCollection Plot of the clustering results. Raises ------ Exception If the clustering has not been performed yet. """ if not self._is_fitted(): raise Exception("Clusterer not fitted. Try excecuting fit function first.") return scatter3dprobaplot(self._data, self.posteriors, **kwargs)
[docs] def surfplot(self, **kwargs): """Plot the clustering results in a 3D surface plot. It uses the :func:`~scludam.plots.surfprobaplot` function. The heights of the surface and colors of the points represent the probability of belonging to the most probable cluster, excluding the noise class. The data is represented in two dimensions, given by the user. Returns ------- matplotlib.collections.PathCollection Plot of the clustering results. Raises ------ Exception If the clustering has not been performed yet. """ if not self._is_fitted(): raise Exception("Clusterer not fitted. Try excecuting fit function first.") return surfprobaplot(self._data, self.posteriors, **kwargs)