Source code for scludam.hkde

# scludam, Star CLUster Detection And Membership estimation package
# Copyright (C) 2022  Simón Pedro González

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""Module for Kernel Density Estimation with variable bandwidth matrices.

The module provides a class for multivariate Kernel Density Estimation with a bandwidth
matrix per observation. Such matrices are created from a baseline bandwidth calculated
from the Plugin or Rule Of Thumb (scott or silverman) methods. Variable errors and
covariances can be added to the matrices.

"""

from abc import abstractmethod
from numbers import Number
from typing import List, Optional, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
from attrs import define, field, validators
from beartype import beartype
from itertools import product
from rpy2.robjects import r
from scipy.stats import multivariate_normal
from statsmodels.stats.correlation_tools import corr_nearest, cov_nearest

from scludam.plots import bivariate_density_plot, univariate_density_plot
from scludam.rutils import (
    assign_r_args,
    clean_r_session,
    disable_r_console_output,
    disable_r_warnings,
    load_r_packages,
)
from scludam.type_utils import (
    ArrayLike,
    Numeric1DArray,
    Numeric1DArrayLike,
    Numeric2DArray,
    NumericArray,
    OptionalNumeric1DArrayLike,
    OptionalNumeric2DArray,
    OptionalNumericArray,
    _type,
)

disable_r_warnings()
disable_r_console_output()
load_r_packages(r, ["ks"])


[docs]class BandwidthSelector: """Base class for bandwidth selector."""
[docs] @abstractmethod def get_bw(data: Numeric2DArray, *args, **kwargs): """Get the bandwidth for the given data. Parameters ---------- data : Numeric2DArray Data. """ pass
[docs]@define class PluginSelector(BandwidthSelector): """Bandwidth selector based on the Plugin method. It uses the Plugin method with unconstraned pilot bandwidth [1]_ [2]_ implementation in the `ks` R package [3]_. See the `ks` package documentation for more information on parameter values. All attributes are passed to ``ks::Hpi`` function. Attributes ---------- nstage : int, optional Number of calculation stages, can be 1 or 2, by default 2. pilot: str, optional Kind of pilot bandwidth. binned: bool, optional Use binned estimation, by default False. diag: bool, optional Whether to use the diagonal bandwidth, by default False. If true, ``ks::Hpi.diag`` is used. References ---------- .. [1] Chacon, J.E., Duong, T. (2010) Multivariate plug-in bandwidth selection with unconstrained pilot matrices. Test, 19, 375-398. .. [2] Chacon, J.E., Duong, T. (2018) Multivariate Kernel Smoothing and Its Applications (pp. 43-66). .. [3] Duong, T. (2013). ks: Kernel Smoothing. R package version 1.13.3. https://CRAN.R-project.org/package=ks """ nstage: int = None pilot: str = None binned: bool = None diag: bool = False def _build_r_command(self, data: Numeric2DArray): params = { "nstage": self.nstage, "pilot": self.pilot, "binned": self.binned, } # delete all previous session variables clean_r_session(r, "var") _, rparams = assign_r_args(r, x=data, **params) return f'ks::Hpi{".diag" if self.diag else ""}({rparams})'
[docs] @beartype def get_bw(self, data: Numeric2DArray): """Get the bandwidth for the given data. Builds R ``ks::Hpi`` command and executes it. Parameters ---------- data : Numeric2DArray Data. Returns ------- Numeric2DArray Optimal bandwidth matrix H acoording to the Plugin Method. """ _, dims = data.shape command = self._build_r_command(data) result = r(command) return np.asarray(result)
[docs]@define class RuleOfThumbSelector(BandwidthSelector): """Bandwidth selector based on the Rule of Thumb method. Attributes ---------- rule : str, optional Name of the rule of thumb to use, by default "scott". Can be "scott" or "silverman". diag : bool, optional Whether to use the diagonal bandwidth, by default False. Raises ------ ValueError If rule is not "scott" or "silverman". """ rule: str = field(default="scott", validator=validators.in_(["scott", "silverman"])) diag: bool = False def _scotts_factor(self, data): n = data.shape[0] d = data.shape[1] return np.power(n, -1.0 / (d + 4)) def _silverman_factor(self, data): n = data.shape[0] d = data.shape[1] return np.power(n * (d + 2.0) / 4.0, -1.0 / (d + 4)) def _get_factor(self, data): if self.rule == "scott": return self._scotts_factor(data) elif self.rule == "silverman": return self._silverman_factor(data) else: raise ValueError("Invalid rule") def _get_data_covariance(self, data, weights: Union[Numeric2DArray, None]): kws = {} if weights is not None: kws["aweights"] = weights if self.diag: data_covariance = np.diagflat( [ np.cov(data[:, i], rowvar=False, bias=False, **kws) for i in range(data.shape[1]) ] ) else: data_covariance = np.cov(data, rowvar=False, bias=False, **kws) data_covariance = cov_nearest(data_covariance) return data_covariance
[docs] @beartype def get_bw(self, data: Numeric2DArray, weights: Union[None, Numeric1DArray] = None): """Calculate bandwith matrix using the rule of thumb. Parameters ---------- data : Numeric2DArray Data to be used weights : Union[None, Numeric1DArray] Optional weights to be used. """ # This function does not consider the case # of weights being zero, so they should be at least > 1e-08. data_covariance = self._get_data_covariance(data, weights) factor = self._get_factor(data) return data_covariance * factor**2
[docs]@define class HKDE: """Kernel Density Estimation with variable bandwidth matrices (H). Only for multivariate data (2d-nd). As it does not uses KDE by Fast Fourier Transform, it may take some time for big datasets. Attributes ---------- bw: Union[BandwidthSelector, Number, NumericArray], optional Bandwidth to be used, by default an instance of :class:`~scludam.hkde.PluginSelector`. It can be: * an instance of :class:`~scludam.hkde.BandwidthSelector`: the base bandwidth is calculated using the given selector. * a Number: the base bandwidth is calculated as a diagonal matrix with the given value. * an Array: the base bandwidth is taken as the given array. The array shape must be (n, d, d) where n is the number of observations and d is the number of dimensions. * a String: the name of the rule of thumb to be used. One of "scott" or "silverman". error_convolution: bool, optional When true: * It can only estimate density for the same points as the data. That is, eval points are equal to data points. * It always is a leave-one-out estimation. * To calculate the contribution of point A to the density evaluated at point B, both the bandwidth matrix of point A and the bandwidth matrix of point B are convolved. * This option should be used to get an accurate measure of the density at the data points considering the uncertainty of all points, themselves included. * As a new matrix is calculated for each combination of points, is the slowest option. Although it has been optimized with ball tree to reduce the number of matrices used, it could be problematic for big concentrated datasets. * Default is False. Examples -------- .. literalinclude:: ../../examples/hkde/hkde.py :language: python :linenos: .. image:: ../../examples/hkde/hkde.png """ # input attrs bw: Union[BandwidthSelector, Number, NumericArray, List[Number], str] = field( default=PluginSelector(), validator=_type( Union[BandwidthSelector, Number, List[Number], NumericArray, str] ), ) error_convolution: bool = False # internal attrs _kernels: ArrayLike = None _weights: Numeric1DArrayLike = None _covariances: NumericArray = None _base_bw: NumericArray = None _data: Numeric2DArray = None _n: int = None _d: int = None _n_eff: int = None _eff_mask: ArrayLike = None _maxs: Numeric1DArray = None _mins: Numeric1DArray = None
[docs] @beartype def set_weights(self, weights: ArrayLike): """Set the weights for each data point. Set a weight value for each data point, between 0 and 1. Parameters ---------- weights : ArrayLike Weights for each data point. Returns ------- HKDE Instance of :class:`~scludam.hkde.HKDE`. Raises ------ ValueError If weights are not between 0 and 1 or do not match the correct dimensions (Array of shape (n,)). """ weights = np.asarray(weights) if len(weights.shape) != 1: raise ValueError("Weights must be 1d np ndarray.") if np.any(weights > 1): raise ValueError("Weight values must belong to [0,1].") if weights.shape[0] != self._n: raise ValueError("Data must have same n as weights.") self._weights = weights # default atol used by numpy isclose is 1e-08 self._eff_mask = self._weights > 1e-08 self._n_eff = np.sum(self._weights[self._eff_mask]) return self
def _get_err_matrices(self, err: Numeric2DArray, corr: NumericArray = None): if err.shape != (self._n, self._d): raise ValueError("error array must have the same shape as data array.") if corr is None: return np.apply_along_axis(lambda x: np.diag(x**2), -1, err) corr_matrices = self._get_corr_matrices(corr) return ( np.apply_along_axis(lambda x: x * np.atleast_2d(x).T, -1, err) * corr_matrices ) def _get_corr_matrices(self, corr: NumericArray): n, d = (self._n, self._d) # correlation is given # is array if corr.shape == (d, d): # correlation is given as global correlation matrix per dims # first try to find nearest correct correlation # it ensures matrix is positive semidefinite corr = corr_nearest(corr) if not np.allclose(np.diag(corr), np.ones(d)): raise ValueError("Correlation matrix must have 1 in diagonal") return np.repeat(corr, repeats=n, axis=1).reshape((d, d, n)).T elif corr.shape == (n, int(d * (d - 1) / 2)): # correlation is given per observation per obs, per dims # pairwise corr coef # (no need for the 1s given by corr(samevar, samevar)) # per observation. Example: for 1 obs and 4 vars, lower triangle of corr # matrix looks like: # 12 # 13 23 # 14 24 34 # method should receive obs1 => [12, 13, 23, 14, 24, 34] n_corrs = corr.shape[1] corrs = np.zeros((n, d, d)) tril_idcs = tuple( map( tuple, np.vstack( ( np.arange(n).repeat(n_corrs), np.tile( np.array(np.tril_indices(d, k=-1)), (n,), ), ) ), ) ) corrs[tril_idcs] = corr.ravel() corrs = corrs + np.transpose(corrs, (0, 2, 1)) diag_idcs = tuple( map( tuple, np.vstack( ( np.arange(n).repeat(d), np.tile(np.array(np.diag_indices(d)), (n,)), ) ), ) ) corrs[diag_idcs] = 1 return corrs else: raise ValueError("Wrong corr dimensions") def _get_bw_matrices(self, data: Numeric2DArray): if isinstance(self.bw, BandwidthSelector): bw_matrix = self.bw.get_bw(data[self._eff_mask]) elif isinstance(self.bw, str): bw_matrix = RuleOfThumbSelector(rule=self.bw).get_bw( data[self._eff_mask], self._weights[self._eff_mask] ) elif isinstance(self.bw, np.ndarray) or isinstance(self.bw, list): if isinstance(self.bw, list): self.bw = np.array(self.bw) if len(self.bw.shape) == 1 and self.bw.shape[0] == self._d: bw_matrix = np.diag(self.bw) elif self.bw.shape == (self._d, self._d): bw_matrix = self.bw else: raise ValueError("Incorrect shape of bandwidth array") self._base_bw = bw_matrix return np.repeat(bw_matrix[:, np.newaxis], self._n, 1).swapaxes(0, 1) def _get_cov_matrices( self, data: Numeric2DArray, err: Numeric2DArray = None, corr: NumericArray = None, ): bw_matrices = self._get_bw_matrices(data) if err is None and corr is None: return bw_matrices err_matrices = self._get_err_matrices(err, corr) # sum of covariance matrices convolves a kernel for bw and a kernel for error return bw_matrices + err_matrices
[docs] @beartype def fit( self, data: Numeric2DArray, err: OptionalNumeric2DArray = None, corr: OptionalNumericArray = None, weights: OptionalNumeric1DArrayLike = None, *args, **kwargs, ): """Fit a KDE model to the provided data. Creates covariances matrices and kernel instances. Parameters ---------- data : Numeric2DArray Data. err : OptionalNumeric2DArray, optional Error array of shape (n, d), by default None. Errors are added to the base bandwidth matrix to create individual H matrices per datapoint. corr : OptionalNumericArray, optional Correlation coeficients, by default None. Coeficients are added to the base bandwith matrix to create individual H matrices per datapoint. Can be one of: * NumericArray of shape (d, d): global correlation matrix. Applied in every bandwidth matrix H. * Numeric2DArray of shape (n, (d * (d - 1) / 2): individual correlation matrices. Each column of the array represents a correlation between two variables, for all observations. Order of columns must follow a lower triangle matrix. For example: for four variables, lower triangle of corr matrix looks like: .. code-block:: python corr(v1, v2) corr(v1, v3), corr(v2, v3) corr(v1, v4), corr(v2, v4), corr(v3, v4) So a valid ``corr`` array for two datapoints would be: .. code-block:: python np.array([ [ corr1(v1, v2), corr1(v1, v3), corr1(v2, v3), corr1(v1, v4), corr1(v2, v4), corr1(v3, v4) ], [ corr2(v1, v2), corr2(v1, v3), corr2(v2, v3), corr2(v1, v4), corr2(v2, v4), corr2(v3, v4) ], ]) weights : OptionalNumeric1DArrayLike, optional Weights to be used for each data point, by default None. If None, all datapoints have the same weight. Returns ------- HKDE Instance of :class:`~scludam.hkde.HKDE`. Notes ----- Base bandwidth matrix is calculated from the ``bw`` parameter. If no additional parameters are provided, the base bandwidth matrix is used for all datapoints. If ``err`` and/or ``corr`` are provided, they are used to create individual covariance matrices for each datapoint [5]_. The final matrix used for each kernel is the sum of the base matrix and the individual covariance matrix, which is equivalent to convolving two gaussian kernels, one for the base bandwidth matrix and one for the individual covariance matrix. The base bandwidth is considered as the minimum bandwidth of the KDE process, for a data point without uncertainty, while the final matrix incorporates the uncertainty if provided. References ---------- .. [5] Luri, X. et al. (2018). Gaia Data Release 2: using Gaia parallaxes. Astronomy and Astrophysics, 616, A9. doi: 10.1051/0004-6361/201832964 """ # noqa E501 self._n, self._d = data.shape self._data = data self._maxs = data.max(axis=0) self._mins = data.min(axis=0) weights = weights if weights is not None else np.ones(self._n) self.set_weights(weights) self._covariances = self._get_cov_matrices(data, err, corr) self._kernels = np.array( [ multivariate_normal( data[i], self._covariances[i], *args, **kwargs, ) for i in range(self._n) ] ) return self
def _is_fitted(self): return not ( self._kernels is None or self._weights is None or self._n_eff is None or self._eff_mask is None or self._covariances is None ) def _calculate_biggest_hypersphere(self): # If sum of diagonal is bigger when correlations are small, then matrix is bigger # get the self._covariances matrix which diagonal sums the biggest sums = np.array([np.diagonal(cc).sum() for cc in self._covariances]) # get the 99 percentile of the sums biggest_cov = np.percentile(sums, 99) closest_cov = np.argmin(np.abs(sums - biggest_cov)) # get the index of the biggest matrix biggest_matrix = self._covariances[closest_cov] # get the biggest matrix # create a multivariate normal around 0 with the biggest matrix, accounting for dims biggest_kde = multivariate_normal( np.zeros(self._d), biggest_matrix, ) # determine where the pdf is <= 1e-08 in all dimensions # and take the distance between 0 and that point # as the radius of the biggest sphere grid_range = (-3, 3) resolution = 10 threshold = 1e-08 grid_linspace = np.linspace(grid_range[0], grid_range[1], resolution) dim = self._d points = np.array(list(product(grid_linspace, repeat=dim))) pdf_values = biggest_kde.pdf(points) points_above_threshold = points[pdf_values > threshold] distances = np.linalg.norm(points_above_threshold, axis=1) max_distance = np.min(distances) return max_distance def _build_tree_ball(self, radius: float, neighbours: Numeric2DArray, eval_points: Numeric2DArray): from sklearn.neighbors import BallTree # build a ball tree with the data tree = BallTree(neighbours) # get the indexes of the points that are inside the ball return tree.query_radius(eval_points, radius) def _pdf_with_error_convolution(self): if not self._is_fitted(): raise Exception("Model not fitted. Try excecuting fit function first.") eval_points = np.asarray(self._data) obs, dims = eval_points.shape if dims != self._d: raise ValueError("Eval points must have same dims as data.") if obs < 1: raise ValueError("Eval points cannot be empty") if self._n_eff <= 0: return np.zeros(obs) pdf = np.zeros(obs) all_covariances = self._covariances weights = self._weights[self._eff_mask] covariances = self._covariances[self._eff_mask] data = self._data[self._eff_mask] n = self._n_eff tree = self._build_tree_ball(self._calculate_biggest_hypersphere(), data, self._data) # put weights and normalization toghether in each step # pdf(point) = sum(ki(point)*wi/(sum(w)-wi)) norm_weigths = weights / (n - weights) pdf = np.zeros(self._n) for j, p in enumerate(eval_points): # print(f'hkde progress: {round(j/self._n * 100, 2)}') # get the indexes of the points that are inside the ball indexes = tree[j] # get the covariances of the points that are inside the ball point_cov = all_covariances[j] applied_ks = 0 for idx in indexes: mean = data[idx] if not np.allclose(mean, p): cov = covariances[idx] + point_cov k = multivariate_normal( mean, cov ) tosum = k.pdf(p) * norm_weigths[idx] applied_ks += tosum pdf[j] = applied_ks if obs == 1: # return as float value return pdf[0] return pdf
[docs] def pdf(self, eval_points: Numeric2DArray, leave1out: bool = True): """Probability density function. Evaluate the probability density function at the provided points, using the fitted KDE model. Parameters ---------- eval_points : Numeric2DArray Observation or observations to evaluate the PDF at. leave1out : bool, optional Wether to set weigth to 0 for the point being evaluated, by default True. Returns ------- Numeric1DArray PDF values for the provided points. Raises ------ Exception If the KDE model is not fitted. ValueError If the shape of the provided points is not compatible with the fitted KDE model. """ if self.error_convolution: # ignores the rest of parameters as it only # works with the data and its alwas leave1out return self._pdf_with_error_convolution() if not self._is_fitted(): raise Exception("Model not fitted. Try excecuting fit function first.") eval_points = np.asarray(eval_points) obs, dims = eval_points.shape if dims != self._d: raise ValueError("Eval points must have same dims as data.") if obs < 1: raise ValueError("Eval points cannot be empty") if self._n_eff <= 0: return np.zeros(obs) pdf = np.zeros(obs) kernels = self._kernels[self._eff_mask] weights = self._weights[self._eff_mask] n = self._n_eff # put weights and normalization toghether in each step # pdf(point) = sum(ki(point)*wi/(sum(w)-wi)) if leave1out: norm_weights = weights / (n - weights) for i, k in enumerate(kernels): applied_k = k.pdf(eval_points) * norm_weights[i] applied_k[i] = 0 pdf += applied_k else: norm_weights = weights / n for i, k in enumerate(kernels): applied_k = k.pdf(eval_points) * norm_weights[i] pdf += applied_k if obs == 1: # return as float value return pdf[0] return pdf
[docs] def plot( self, gr: int = 50, figsize: Tuple[int, int] = (8, 6), cols: Optional[str] = None, **kwargs, ): """Plot the KDE model applied in a grid. Creates a pairplot of the KDE model applied in a grid that spans between the data max and min values for each dimension. Parameters ---------- gr : int, optional Grid resolution, number of bins to be taken into account for each dimension, by default 50. Note that data dimensions and grid resolution determine how many points are evaluated, as ``eval_points=gr**dims``. A high ``gr`` value can result in a long computation time. figsize : Tuple[int, int], optional Figure size, by default (8, 6) cols : Optional[str], optional Column names to plot over the axes, by default None. Returns ------- matplotlib.figure.Figure Figure with the pairplot. Raises ------ Exception If the KDE model is not fitted yet. """ if not self._is_fitted(): raise Exception("Model not fitted. Try excecuting fit function first.") # prepare data to plot n = gr linspaces = tuple(np.linspace(self._mins, self._maxs, num=n).T) grid = np.meshgrid(*linspaces, indexing="ij") data = np.vstack(tuple(map(np.ravel, grid))).T density = self.pdf(eval_points=data) data_and_density = np.vstack((data.T, density)) grids = [axis.reshape(*tuple([n] * self._d)) for axis in data_and_density] density_grid = grids[-1] if cols is None: cols = [f"var{d+1}" for d in range(self._d)] fig, axes = plt.subplots(self._d, self._d, figsize=figsize) # fig.subplots_adjust(hspace=.5, wspace=.001) for i in range(self._d): for j in range(self._d): if i == j: x = linspaces[i] axis_to_sum = tuple(set(list(np.arange(self._d))) - {i}) if len(axis_to_sum): y = density_grid.sum(axis=axis_to_sum) y = y / gr univariate_density_plot(x=x, y=y, ax=axes[i, i]) else: x, y = np.meshgrid(linspaces[j], linspaces[i]) axis_to_sum = tuple(set(list(np.arange(self._d))) - {i} - {j}) if len(axis_to_sum): z = density_grid.sum(axis=axis_to_sum) else: z = density_grid if i > j: z = z.T z = z / gr _, im = bivariate_density_plot( x=x, y=y, z=z, colorbar=False, ax=axes[i, j], **kwargs ) y_axes_to_join = [set([]) for i in range(self._d)] for i in range(self._d): for j in range(self._d): if i != j: # share x axis with univariate density of same column axes[i, j].sharex(axes[j, j]) axes[i, j].axis("tight") if j == 0 or (j == 1 and i == 0): for k in range(self._d): if k != j and k != i: y_axes_to_join[i].add(axes[i, j]) y_axes_to_join[i].add(axes[i, k]) for i in range(self._d): yatj = list(y_axes_to_join[i]) if len(yatj) > 1: yatj[0].get_shared_y_axes().join(yatj[0], *yatj[1:]) for i in range(self._d): axes[-1, i].set_xlabel(cols[i]) axes[i, 0].set_ylabel(cols[i]) fig.colorbar(im, ax=axes.ravel().tolist()) return fig, axes