Source code for scludam.detection

# scludam, Star CLUster Detection And Membership estimation package
# Copyright (C) 2022  Simón Pedro González

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""Module for density peak detection in numerical data.

This module provides density peak detection over numerical data and helper functions
for tasks such as defining filtering masks. The main API of
the module includes :class:`~scludam.detection.CountPeakDetector`,
:func:`~scludam.detection.default_mask` and :func:`~scludam.detection.extend_1dmask`,
which can be direcly imported from ``scludam``. Other functions and classes can be
imported from ``scludam.detection``.

"""

from copy import deepcopy
from decimal import Decimal
from numbers import Number
from typing import List, Union
from warnings import warn

import numpy as np
from astropy.stats.sigma_clipping import sigma_clipped_stats
from attrs import define, field, validators
from beartype import beartype
from numpy.typing import NDArray
from scipy import ndimage
from skimage.feature import peak_local_max

from scludam.masker import RangeMasker
from scludam.plots import heatmap2D, horizontal_lineplots
from scludam.type_utils import (
    ArrayLike,
    Numeric1DArrayLike,
    Numeric2DArray,
    OptionalArrayLike,
    OptionalNumeric1DArrayLike,
    _type,
)


[docs]@beartype def default_mask(dim: int): """Create a default mean mask for a given dimension. It returns a mean weighted mask with 5 elements per dimension, to be used as a filter for convolution. The sum of the mask weights is equal to 1. Parameters ---------- dim : int Dimension of the mask. Returns ------- NDArray[np.number] Array with the mask. Notes ----- The shape of the mask is chosen so it takes into account the values of neighboring bins, but not the value of the bin over which the mask is applied. The mask is intended to produce a good estimate of the local density of the background of the bin over which it is applied. This mask is used in the method applied by González-Alejo (2020) [1]_. References ---------- .. [1] Alejo, A.D., González, J.F., González, S. P. (2020). Estudio de membresía de cúmulos estelares utilizando Gaia DR2. Cuaderno de Resúmenes 62a Reunión Anual Asociación Argentina de Astronomía, Rosario, Provincia de Santa Fe, 64. Examples -------- .. literalinclude:: ../../examples/detection/default_mask.py :language: python :linenos: """ indexes = np.array(np.meshgrid(*np.tile(np.arange(5), (dim, 1)))).T.reshape( (-1, dim) ) mask = np.zeros([5] * dim) cond = np.sum((indexes - 2) ** 2, axis=1) mask[tuple(indexes[np.argwhere((cond > 0) & (cond < 5))].reshape((-1, dim)).T)] = 1 return mask / np.count_nonzero(mask)
def _convolve( data, mask: OptionalArrayLike = None, c_filter: callable = None, *args, **kwargs ): if c_filter: return c_filter(data, *args, **kwargs) if mask is not None: return ndimage.convolve(data, mask, *args, **kwargs) # unused # def var_filter(data, mask=None, *args, **kwargs): # if mask is not None: # kwargs["footprint"] = mask != 0 # return _convolve( # data, # c_filter=ndimage.generic_filter, # function=np.var, # *args, # **kwargs, # ) # unused # def std_filter(data, mask=None, *args, **kwargs): # if mask is not None: # kwargs["footprint"] = mask != 0 # return _convolve( # data, # c_filter=ndimage.generic_filter, # function=np.std, # *args, # **kwargs, # )
[docs]@beartype def fast_std_filter(data: NDArray, mask: ArrayLike, **kwargs): """Fast standard deviation filter. To be applied over an image or histogram. Parameters ---------- data : NDArray Image or n-dimensional histogram. mask : ArrayLike Mask to be used for the convolution. Returns ------- NDArray Filtered image or histogram of the same dimensions. Notes ----- Is possible to pass kwargs to the ``scipy.ndimage.convolve`` function. Its default border mode is 'reflect'. """ u_x2 = _convolve(data, mask=mask, **kwargs) ux_2 = _convolve(data * data, mask=mask, **kwargs) return (ux_2 - u_x2 * u_x2) ** 0.5
[docs]@beartype def get_histogram_bins( data: Numeric2DArray, bin_shape: Numeric1DArrayLike, offsets: OptionalNumeric1DArrayLike = None, ): """Get histogram bins and edges given a bin shape and data. The method takes into account the data max and min values for each dimension and the bin shape to calculate the amount of bins and the edges to be used of an histogram. Half a bin is added to each extremum to avoid bins edges to be exactly on the data extremums. Parameters ---------- data : Numeric2DArray Data to be used to get the histogram bins. bin_shape : Numeric1DArrayLike Bin shape (each dimension) to be used. offsets : OptionalNumeric1DArrayLike, optional Offsets to be added to the edges, by default ``None`` Returns ------- (Numeric1DArray, Numeric2DArray) Number of bins and edges. """ _, dim = data.shape # calculate the margins which are added to the range in order # to fit a number of bins that is integer margins = [ ( bin_shape[i] - float( Decimal(float(data[:, i].max() - data[:, i].min())) % Decimal(float(bin_shape[i])) ) ) / 2.0 for i in range(dim) ] # add base margins ranges = [ [data[:, i].min() - margins[i], data[:, i].max() + margins[i]] for i in range(dim) ] if offsets is not None: ranges = [ [ranges[i][0] + offsets[i], ranges[i][1] + offsets[i]] for i in range(dim) ] bins = [round((ranges[i][1] - ranges[i][0]) / bin_shape[i]) for i in range(dim)] return np.array(bins), np.array(ranges)
[docs]@beartype def histogram( data: Numeric2DArray, bin_shape: Numeric1DArrayLike, offsets: OptionalNumeric1DArrayLike = None, ): """Get a histogram given a bin shape and data. Uses :func:`~scludam.detection.get_histogram_bins` results to create a n-dimensional histogram. Parameters ---------- data : Numeric2DArray Data to be used. bin_shape : Numeric1DArrayLike Bin shape (in each dimension) to be used. offsets : OptionalNumeric1DArrayLike, optional Offsets to shift the edges of the histogram, by default ``None`` Returns ------- (NDArray, NDArray) Histogram and edges. """ _, dim = data.shape bins, ranges = get_histogram_bins(data, bin_shape, offsets) hist, edges = np.histogramdd(data, bins=bins, range=ranges, density=False) return hist, edges
[docs]@beartype def nyquist_offsets(bin_shape: Numeric1DArrayLike): """Get offsets for shifting a histogram. Get all possible offsets for a given bin shape, to be used to shift the histogram edges following the Nyquist spatial sampling interval. The offsets are calculated as the half of the bin shape. Parameters ---------- bin_shape : Numeric1DArrayLike Bin shape (in each dimension). Returns ------- Numeric2DArray Array of shape (n_combinations, n_dimensions) with the possible offsets. """ dim = len(bin_shape) if not dim: return [] values = np.vstack((np.array(bin_shape) / 2, np.zeros(dim))).T combinations = np.array(np.meshgrid(*values)).T.reshape((-1, dim)) return np.flip(combinations, axis=0)
def _are_indices_adjacent(a: Numeric1DArrayLike, b: Numeric1DArrayLike): return np.all(np.abs(np.asarray(a) - np.asarray(b)) <= 1)
[docs]@beartype def extend_1dmask(mask: ArrayLike, dim: int): """Extend a 1-dimensional filtering mask to a n-dimensional mask. From a numeric filtering 1D mask, the function uses the outer product to extend it to a n-dimensional one. The resulting mask is the combination of n 1D masks orthogonal to each other. The sum of the resulting mask is equal to 1. Parameters ---------- mask : ArrayLike 1D mask to be extended. dim : int Dimension of the mask to be extended to. Returns ------- NDArray Extended mask. Examples -------- .. literalinclude:: ../../examples/detection/extend_1dmask.py :language: python :linenos: .. image:: ../../examples/detection/extend_1dmask.png """ m1 = np.asarray(mask) mi = m1 for i in range(dim - 1): mi = np.multiply.outer(mi, m1) return mi / mi.sum()
def _get_higher_score_offset_per_peak(indices: List, scores: List): indices = indices.copy() scores = scores.copy() if not len(indices) or not len(scores): return [] if len(scores) != len(indices): raise ValueError("indices and scores must have the same length") peaks = list(zip(list(range(len(indices))), indices, scores)) best = [peaks[0]] peaks = peaks[1:] while len(peaks) > 0: peak = peaks.pop(0) ii, iindex, iscore = peak j = 0 while j < len(best): ji, jindex, jscore = best[j] if ji != ii and _are_indices_adjacent(iindex, jindex): if iscore > jscore: # keep same jindex but update peak # so it does not follow a path of # adjacent best peaks indefinitely best[j] = [ii, jindex, iscore] break j += 1 if j == len(best): best.append(peak) best = sorted(best, key=lambda x: x[2], reverse=True) return np.array([p[0] for p in best])
[docs]@define class DetectionResult: """Result of a detection run. Attributes ---------- centers : Numeric2DArray Centers of the detected peaks. Are calculated using sigma clipped median over the data delimited by the ``edges``. edges : Numeric2DArray Edges that delimit the data used to calculate the centers. It is taken as a bin shape in each direction, in each dimension. scores : Numeric1DArray Scores of the detected peaks. The results are sorted by score in descending order. counts : Numeric1DArray Number of data points in the bin that represents the peak. sigmas : Numeric1DArray Sigma of the detected peaks. Currently arbitrarily set as the bin shape in each dimension. offsets : Numeric2DArray Offsets used for detecting each peak. indices : Numeric2DArray Indices of the detected peaks in the histogram. """ centers: Numeric2DArray = np.array([]) sigmas: NDArray[np.number] = np.array([]) scores: NDArray[np.number] = np.array([]) counts: NDArray[np.number] = np.array([]) edges: Numeric2DArray = np.array([]) offsets: Numeric2DArray = np.array([]) indices: Numeric2DArray = np.array([])
# TODO: add a plot for the result
[docs]@define class CountPeakDetector: """Count peak detector class. Uses an n-dimensional histogram (array) to detect density peaks in the input data. Attributes ---------- bin_shape : Numeric1DArrayLike Bin shape (in each dimension) to be used to create the histogram. mask: OptionalArrayLike, optional Mask to be used as in the filtering operations, by default uses :func:`~scludam.detection.default_mask` with data dimensions. The mask must have same dimensions as the data and its weights must sum to 1 and be appropriate for smoothing. nyquist_offsets : bool, optional If ``True``, the Nyquist spatial sampling interval is used to shift the histogram edges, by default ``True``. It helps to underestimating the bin count due to an arbitrarily chosen bin edge shift. It uses :func:`~scludam.detection.nyquist_offsets`. min_count: Number, optional Mimimum count for a bin to be elegible as a peak, by default 10. Also used to ``remove_low_density_regions`` if that option is enabled. remove_low_density_regions : bool, optional If ``True``, low density bins are removed from the histogram, by default True. It removes low density bins from the edges of the histogram, trimming down the region of interest and reducing the size of the histogram, which in turn reduces memory usage for sparse data. It uses the ``min_count`` value as the threshold. It also keeps bins that are in the neigborhood of a valid (dense) bin so the filtering operation can be applied to the remaining bins correctly. The neighborhood is defined by the size of the ``mask`` to be used for the filtering operations. min_dif: Number, optional Minimum difference between the background and the bin count for a bin to be elegible as peak, by default 10. The formula used is: ``elegible if histogram - background > min_dif`` where ``background`` is obtained by using filtering the histogram with the provided ``mask``. min_sigma_dif: Number, optional Sigma value to be used to calculate difference between the background and the bin count for a bin to be elegible as peak, by default ``None`` (deactivated). The formula used is: ``elegible if histogram - background > min_sigma_dif*std`` where ``background`` is obtained by using filtering the histogram with the provided ``mask``. ``std`` represents the standard deviation in a window surrounding the bin, calculated according to the ``norm_mode`` parameter. min_score: Number, optional Minimum score for a bin to be elegible as peak, by default 2. The score is calculated as the standardized difference between the bin count and the background: ``score = (histogram - background) / std`` where ``background`` is obtained by using filtering the histogram with the provided ``mask``. ``std`` is calculated according to the ``norm_mode`` parameter. max_n_peaks: Number, optional Maximum number of peaks to be detected, by default 10. Use ``np.inf`` to detect all peaks. min_interpeak_distance: int, optional Minimum number of bins between peaks, by default 1. norm_mode: str, optional Mode to be used to get the standard deviation used in the score calculation, by default "std". Can be one of the following: #. "std": Standard deviation of the ``histogram - background`` calculated using the ``mask`` provided and :func:`~scludam.detection.fast_std_filter` #. "approx": Approximation [1]_ to the standard deviation taking into account how the ``sharp = histogram - background`` is obtained. * An common estimate of the standard deviation of an histogram is the root square of the bin count: ``std(h) = sqrt(h)``. * According to the uncertainty propagation: if ``s = h - b``, then ``std(s) = sqrt(var(h) + var(b) - 2*cov(h,b))``. * Considering ``2*cov(h,b)~0``, the approximation is: ``std(s) = sqrt(h + b)``. select_index: int, optional Detection process only returns the peak with the index provided, by default None. Notes ----- The algorithm used is based in the following steps: #. Remove the data corresponding to low density regions from the edges to the center, until a dense enough bin is found, as described in the ``remove_low_density_regions`` parameter. #. Calculate all possible offsets for the histogram edges, using the Nyquist spatial sampling interval. The region surveyed is subdivided into a rectilinear grid of overlapping hypercubes separated by half the side length of an individual bin [2]_ [3]_ [4]_. #. Instead of creating one histogram including all possible offsets, which can be very large when dimensionality increases, an histogram is created for each possible offset. Per histogram, the following steps are preformed: #. Estimate the background density, convolving the histogram with the provided ``mask``, smoothing the histogram over adjacent bins inside a window defined by the mask size [2]_ [5]_ [6]_. #. Calculate the excess of data points in each bin as the difference between the bin count and the background density. This is equivalent to applying a high-pass filter to the histogram. It should be noted that the excess count using this method can be poorly estimated, specially when the bin shape used is not appropriate. #. Calculate the score of each bin as the normalized excess count, using the methods described in the ``norm_mode`` parameter. #. Apply ``min_count``, ``min_sigma_dif``, ``min_dif`` and ``min_interpeak_distance`` constraints and find peaks in the n-dimensional score histogram. #. Take the peaks found in each shifted histogram and merge them into a a single list, taking only the higher score shift for each peak. The list is sorted in descending order by score. The fundamental parameter of the method is ``bin_shape``. In general, the shape must be chosen as the span in each dimension of the object to be detected. References ---------- .. [2] Schmeja, S. (2011). Identifying star clusters in a field: A comparison of different algorithms. Astronomische Nachrichten, 332, 172-184. doi: 10.1002/asna.201011484 .. [3] Lada, E. A., Lada, C. J. (1995). Near-infrared images of IC 348 and the luminosity functions of young embedded star clusters. The Astrophysical Journal, 109. .. [4] Nanda Kumar, M. S., Kamath U. S., and Davis, C. J. (2004). Embedded star clusters in the W51 giant molecular cloud. Monthly Notices of the Royal Astronomical Society, 353, 1025–1034. doi:10.1111/j.1365-2966.2004.08143.x .. [5] Lada, E. A., DePoy, D. L., Evans, N. J. y Gatley, I. (1991). Micron survey in the LI630 molecular cloud. The Astrophysical Journal, 371, 171-182. .. [6] Karampelas, A., Dapergolas, A., Kontizas, E., Livanou, E., Kontizas, M., Bellas-Velidis, I. y Vílchez, J. M. (2009). Star complexes and stellar populations in NGC 6822: Comparison with the Magellanic Clouds. Astronomy and Astrophysics, 497, 703–711. Examples -------- .. literalinclude:: ../../examples/detection/count_peak_detector.py :language: python :linenos: .. image:: ../../examples/detection/count_peak_detector.png """ bin_shape: Numeric1DArrayLike = field(validator=_type(Numeric1DArrayLike)) mask: OptionalArrayLike = field(default=None, validator=_type(OptionalArrayLike)) nyquist_offset: bool = field(default=True) min_count: Number = field(default=10) min_dif: Number = field(default=10) min_sigma_dif: Number = field(default=None) min_score: Number = field(default=2) max_n_peaks: int = field(default=10) select_index: int = field(default=None) min_interpeak_dist: int = field(default=1, validator=_type(int)) remove_low_density_regions: bool = field(default=True, validator=_type(bool)) norm_mode: str = field(default="std", validator=validators.in_(["std", "approx"])) _offsets: OptionalArrayLike = None _last_result: DetectionResult = None _data: Numeric2DArray = None def _remove_low_density_regions(self, data: Numeric2DArray): obs, dim = data.shape # calculate data ranges in each dimension, taking into account # that bin number must be integer _, ranges = get_histogram_bins(data, self.bin_shape) for i in range(dim): shifts = [self.bin_shape[i], -self.bin_shape[i]] for j in range(2): while True: if ranges[i][0] >= ranges[i][1]: raise ValueError( """No bin passed minimum density check. Check min_count parameter.""" ) slice_ranges = np.copy(ranges) # if j = 0 -> upper limit = lower limit + bin shape # if j = 1 -> lower limit = upper limit - bin shape slice_ranges[i][int(not (j))] = slice_ranges[i][j] + shifts[j] data_slice = data[RangeMasker(limits=slice_ranges).mask(data)] if data_slice.shape[0] != 0: slice_histogram, _ = histogram(data_slice, self.bin_shape) # if any bin has min required count, stop trimming if np.any(slice_histogram >= self.min_count): break # else, move limit towards the center and continue ranges[i][j] = slice_ranges[i][int(not (j))] # extend ranges half mask shape in each direction so data that belongs to # an invalid bin can contribute in a border valid bin when the mask is applied mask_shape = np.array(self.mask.shape) half_mask_shape = np.floor(mask_shape / 2) ranges[:, 0] = ranges[:, 0] - half_mask_shape * self.bin_shape ranges[:, 1] = ranges[:, 1] + half_mask_shape * self.bin_shape # trim data and return trimmed_data = data[RangeMasker(limits=ranges).mask(data)] return trimmed_data def _set_nyquist_offsets(self): dim = len(self.bin_shape) if not self.nyquist_offset: self._offsets = np.atleast_2d(np.zeros(dim)) else: values = np.vstack((np.array(self.bin_shape) / 2, np.zeros(dim))).T combinations = np.array(np.meshgrid(*values)).T.reshape((-1, dim)) self._offsets = np.flip(combinations, axis=0)
[docs] @beartype def detect(self, data: Numeric2DArray): """Detect peaks in the provided data. Uses the configuration provided in the class attributes. Parameters ---------- data : Numeric2DArray Numerical data to be used. Returns ------- DetectionResult Instance containing the detected peaks. Raises ------ ValueError If ``remove_low_density_regions`` is used and no bin passes the density check, the min_count is probably too low. ValueError If ``data``, ``bin_shape`` and ``mask`` dimensions do not match. Warns ----- UserWarning If histogram has too few bins in some dimension, the filtering operations can still be applied but prone to border effects. """ if len(data.shape) != 2: raise ValueError("data array must have 2 dimensions") obs, dim = data.shape # mask setup if self.mask is None: self.mask = default_mask(dim) self.mask = np.array(self.mask) mask = self.mask # check mask and bin shape are compatible self.bin_shape = np.array(self.bin_shape) if len(mask.shape) != dim: raise ValueError("mask does not match data dimensions") if len(self.bin_shape) != dim: raise ValueError("bin_shape does not match data dimensions") # remove points in low density regions if self.remove_low_density_regions and self.min_count: data = self._remove_low_density_regions(data) self._data = data # set nyquist offsets self._set_nyquist_offsets() # get histogram ranges and bin numbers bins, ranges = get_histogram_bins(data, self.bin_shape) # set peak detection parameters peak_detection_params = {} if np.any(np.array(bins) < np.array(mask.shape)): warn( f"Histogram has too few bins in some dimensions: bin numbers are {bins}" ) peak_detection_params["exclude_border"] = False else: peak_detection_params["exclude_border"] = True if self.min_interpeak_dist: peak_detection_params["min_distance"] = self.min_interpeak_dist if self.min_score: peak_detection_params["threshold_abs"] = self.min_score if self.max_n_peaks: peak_detection_params["num_peaks"] = self.max_n_peaks # detection g_centers = [] g_scores = [] g_edges = [] g_sigmas = [] g_counts = [] g_indices = [] g_offsets = [] for offset in self._offsets: hist, edges = histogram(data, self.bin_shape, offset) smoothed = _convolve(hist, mask=mask) sharp = hist - smoothed if self.norm_mode == "approx": # approx explanation # err_hist = sqrt(hist) # because: # estimate of std: std[vhat] = sqrt(vhat) = sqrt(n) # error bars are given by the square root # of the number of entries in each bin of the histogram # err_smoothed = std(smoothed) = sqrt(smoothed) # sharp = hist - smoothed so # err_sharp = sqrt(err_hist^2 + err_smoothed^2 - ...) # because: # std[a - b] = sqrt(varianza(a) + varianza(b) - 2*covarianza(a,b)) # according to: https://en.wikipedia.org/wiki/Propagation_of_uncertainty # covarianza(a,b) = varianza(a) * varianza(b) * corrcoef(a,b) # as normalized = sharp / err_sharp # +1 is added to avoid zero division errors std = np.sqrt(smoothed + hist + 1) elif self.norm_mode == "std": # directly gettig std std = fast_std_filter(sharp, mask=mask) + 1 normalized = sharp / std # TODO: remove, other way of getting the std approx # n4 = sharp / np.sqrt(std**2 + fast_std_filter(smoothed, mask=mask)**2 + 1) detection_img = np.copy(normalized) if self.min_dif is not None: detection_img[sharp < self.min_dif] = 0 if self.min_sigma_dif is not None: detection_img[sharp < self.min_sigma_dif * std] = 0 clusters_idx = peak_local_max(detection_img, **peak_detection_params).T _, peak_count = clusters_idx.shape if peak_count != 0: iter_indcs = clusters_idx.T iter_counts = sharp[tuple(clusters_idx)] iter_scores = normalized[tuple(clusters_idx)] limits = [ [ ( edges[i][clusters_idx[i][j]] - self.bin_shape[i], edges[i][clusters_idx[i][j]] + self.bin_shape[i], ) for i in range(dim) ] for j in range(peak_count) ] iter_edges = limits subsets = [ data[RangeMasker(limits=limits[j]).mask(data)] for j in range(peak_count) ] # stats may be useless if other center and sigma are calculated # afterwards e.g. meanshift and profile analysis statistics = np.array( [ [ sigma_clipped_stats( subsets[j][:, i], cenfunc="median", stdfunc="mad_std", maxiters=None, sigma=1, ) for i in range(dim) ] for j in range(peak_count) ] ) iter_centers = statistics[:, :, 1] g_indices += iter_indcs.tolist() g_centers += iter_centers.tolist() g_scores += iter_scores.tolist() g_edges += iter_edges g_sigmas += [np.array(self.bin_shape) for _ in range(peak_count)] g_counts += iter_counts.tolist() g_offsets += [offset for _ in range(peak_count)] if len(g_indices) == 0: self._last_result = DetectionResult() return deepcopy(self._last_result) # compare same peaks in different histogram offsets # and return most sifnificant peak for all offsets g_ind = _get_higher_score_offset_per_peak(g_indices, g_scores) if self.max_n_peaks != np.inf: g_ind = g_ind[0 : self.max_n_peaks] g_centers = np.array(g_centers)[g_ind] g_scores = np.array(g_scores)[g_ind] g_edges = np.array(g_edges)[g_ind] g_sigmas = np.array(g_sigmas)[g_ind] g_indices = np.array(g_indices)[g_ind] g_counts = np.array(g_counts)[g_ind] g_offsets = np.array(g_offsets)[g_ind] # if select_index is set, return only the selected peak if self.select_index is not None: g_centers = np.array([g_centers[self.select_index]]) g_scores = np.array([g_scores[self.select_index]]) g_edges = np.array([g_edges[self.select_index]]) g_sigmas = np.array([g_sigmas[self.select_index]]) g_indices = np.array([g_indices[self.select_index]]) g_counts = np.array([g_counts[self.select_index]]) g_offsets = np.array([g_offsets[self.select_index]]) self._last_result = DetectionResult( centers=g_centers, sigmas=g_sigmas, scores=g_scores, counts=g_counts, edges=g_edges, offsets=g_offsets, indices=g_indices, ) # to avoid any kind of array change issue return deepcopy(self._last_result)
[docs] @beartype def plot( self, peak: int = 0, x: int = 0, y: int = 1, mode: str = "c", cols: Union[List[str], None] = None, cut_label_prec: int = 4, center_label_prec: int = 4, **kwargs, ): """Create a plot of the individual peaks detected. Creates the plot using the result of the last :func:`~scludam.detection.CountPeakDetector.detect` call. Returns a custom seaborn heatmap plot. Passes any kwargs to the plot function. The heatmap is a two dimensional histogram slice, where x and y can be set, and the rest of dimensions are fixed in the peak center. Parameters ---------- peak : int, optional Index of the peak to be plotted in the result array, by default 0 x : int, optional Index of the variable that should be placed as the first dimension, by default 0 y : int, optional Index of the variable that should be placed as the second dimension, by default 1 mode : str, optional Histogram type, by default "c". Can be one of: #. "c": Counts histogram. #. "b": Background histogram. #. "e": Excess histogram. #. "s": Score histogram. The meaning of each histogram can be inferred from the method explained in :class:`~scludam.detection.CountPeakDetector`. cols : Union[[List[str]], None], optional List of variable names, by default ``None``. If ``None``, then the variables are called 'var1', 'var2', and so on. cut_label_prec : int, optional Decimal places for the cut message in the title, by default 4. center_label_prec : int, optional Decimal places for the center message in the title, by default 4. Returns ------- matplotlib.axes._subplots.AxesSubplot Plot of the peak. Raises ------ ValueError If no results are available. ValueError If no peaks are detected in the last result. ValueError Invalid peak index. ValueError Invalid mode. ValueError Invalid x or y dimensions. ValueError Invalid label length. Examples -------- .. literalinclude:: ../../examples/detection/plot.py :language: python :linenos: .. image:: ../../examples/detection/plot1.png .. image:: ../../examples/detection/plot2.png """ if self._last_result is None: raise ValueError("No result available, run detect function first.") if self._last_result.centers.size == 0: raise ValueError("No peaks detected in last run.") if self._last_result.centers.shape[0] <= peak: raise ValueError(f"No peak with index {peak} detected in last run.") if mode not in ["c", "b", "e", "s"]: raise ValueError("Mode must be one of 'c', 'b', 'e' or 's'.") pindex = self._last_result.indices[peak] pcenter = self._last_result.centers[peak] hist, edges = histogram( self._data, self.bin_shape, self._last_result.offsets[peak] ) # duplicated code, pay attention if the method is changed in detect function if mode != "c": smoothed = _convolve(hist, mask=self.mask) if mode != "b": sharp = hist - smoothed if mode != "e": if self.norm_mode == "approx": std = np.sqrt(smoothed + hist + 1) elif self.norm_mode == "std": std = fast_std_filter(sharp, mask=self.mask) + 1 normalized = sharp / std hist = normalized else: hist = sharp else: hist = smoothed dim = len(self.bin_shape) dims = np.arange(dim) if x not in dims or y not in dims: raise ValueError("x and y must be valid dimensions.") if cols is None: cols = np.array([f"var{i+1}" for i in range(dim)], dtype="object") elif len(cols) != dim: raise ValueError("cols must have n_dim elements.") # flip xy order because heatmap plots yx instead of xy xydims = np.flip(dims[[x, y]]) cutdims = np.array(list(set(dims) - set(xydims))) # transpose the axes so xy are first hist = np.transpose(hist, axes=list(xydims) + list(cutdims)) # create a 2d cut for (x,y) with the other dims fixed # on the peak value if len(hist.shape) <= 2: hist2D = hist else: cut = np.array([slice(None)] * 2 + pindex[cutdims].tolist(), dtype="object") hist2D = hist[tuple(cut)] # get the edges of the 2d cut in the xy order edges2D = np.array(edges, dtype="object")[xydims] assert hist2D.shape[0] == edges2D[0].shape[0] - 1 # get the peak indices in the 2d cut in the xy order pindex2D = pindex[xydims] # get the bin_shape for xy in the correct order bin_shape = self.bin_shape.copy()[xydims] hm = heatmap2D( hist2D=hist2D, edges=edges2D, bin_shape=bin_shape, index=pindex2D, **kwargs ) hm.axes.set_xlabel(cols[x]) hm.axes.set_ylabel(cols[y]) cut_values = [round(pcenter[i], cut_label_prec) for i in dims] cut_edges = [round(self.bin_shape[i] / 2, cut_label_prec) for i in dims] cut_string = ", ".join( [f"{cols[i]}={cut_values[i]}±{cut_edges[i]}" for i in cutdims] ) mode_string = { "c": "Count histogram", "b": "Background histogram", "e": "Excess histogram", "s": "Score histogram", }.get(mode, "Count histogram") pcenter_string = ", ".join( [f"{cols[i]}={round(pcenter[i], cut_label_prec)}" for i in dims] ) hm.title.set_style("italic") hm.title.set_text( mode_string + " sliced at " + cut_string + "\npeak" + str(peak) + "=(" + pcenter_string + ")" ) return hm
[docs] def lineplot(self, **kwargs): """Plot the last result as a line plot. Plots scores and counts for all peaks found. Returns ------- AxesSubplot Line plot of the scores and counts. Raises ------ ValueError No results available. """ if self._last_result is None: raise ValueError("No result available, run detect function first.") return horizontal_lineplots( ys=[self._last_result.scores, self._last_result.counts], cols=["scores", "counts"], **kwargs, )