# scludam, Star CLUster Detection And Membership estimation package
# Copyright (C) 2022 Simón Pedro González
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Module for synthetic data generation.
Contains some helpful functions and distributions and the main classes for generating
synthetic sky region samples with an star field and star clusters.
"""
from numbers import Number
from typing import List, Union
import astropy.units as u
import numpy as np
import pandas as pd
from astropy.coordinates import Distance, SkyCoord
from attrs import Factory, define, field, validators
from beartype import beartype
from numpy.typing import NDArray
from scipy import stats
from scipy.spatial import ConvexHull
from scludam.type_utils import (
Numeric1DArray,
Numeric2DArray,
Vector2,
Vector2Array,
Vector3,
Vector3Array,
_type,
)
# Helper functions
[docs]@beartype
def is_inside_circle(
center: Vector2, radius: Number, data: Union[Vector2, Vector2Array]
) -> NDArray[bool]:
"""Check if data is inside a circle.
Parameters
----------
center : Vector2
list, touple of numpy array of 2 number elements, representing the center
of the circle
radius : Number
radius of the circle
data : Union[Vector2, Vector2Array]
numpy numeric array of shape (n, 2) to check if inside the circle
Returns
-------
NDArray[bool]
mask indicating if data is inside the circle.
"""
data = np.array(data)
if data.ndim == 1:
data = data[np.newaxis, :]
dx = np.abs(data[:, 0] - center[0])
dy = np.abs(data[:, 1] - center[1])
return (
(dx <= radius)
& (dy <= radius)
& ((dx + dy <= radius) | (dx**2 + dy**2 <= radius**2))
)
[docs]@beartype
def is_inside_sphere(
center: Vector3, radius: Number, data: Union[Vector3, Vector3Array]
) -> NDArray[bool]:
"""Check if data is inside a sphere.
Parameters
----------
center : Vector3
list, touple of numpy array of 3 number elements, representing the center
of the sphere
radius : Number
radius of the sphere
data : Union[Vector3, Vector3Array]
numeric array of shape (n, 3) to check if inside the sphere
Returns
-------
NDArray[bool]
mask indicating if data is inside the sphere.
"""
data = np.array(data)
if data.ndim == 1:
data = data[np.newaxis, :]
dx = np.abs(data[:, 0] - center[0])
dy = np.abs(data[:, 1] - center[1])
dz = np.abs(data[:, 2] - center[2])
return dx**2 + dy**2 + dz**2 <= radius**2
# Coordinate transformation
[docs]@beartype
def cartesian_to_polar(
coords: Union[Vector3, Vector3Array]
) -> Union[Vector3, Vector3Array]:
"""Convert cartesian coordinates to polar coordinates.
Cartesian coordinates are taken as (x, y, z) in parsecs in
ICRS system and are transformed to (ra, dec, parallax).
Parameters
----------
coords : Union[Vector3, Vector3Array]
cartesian coordinates in (x, y, z) in parsecs in ICRS system
to be transformed to (ra, dec, parallax).
Returns
-------
Union[Vector3, Vector3Array]
polar coordinates in (ra, dec, parallax).
"""
coords = np.array(coords)
if len(coords.shape) == 1:
coords = SkyCoord(
x=coords[0],
y=coords[1],
z=coords[2],
unit="pc",
representation_type="cartesian",
frame="icrs",
)
coords.representation_type = "spherical"
return np.array([coords.ra.deg, coords.dec.deg, coords.distance.parallax.mas])
else:
coords = SkyCoord(
x=coords[:, 0],
y=coords[:, 1],
z=coords[:, 2],
unit="pc",
representation_type="cartesian",
frame="icrs",
)
coords.representation_type = "spherical"
return np.vstack(
(coords.ra.deg, coords.dec.deg, coords.distance.parallax.mas)
).T
[docs]@beartype
def polar_to_cartesian(
coords: Union[Vector3, Vector3Array]
) -> Union[Vector3, Vector3Array]:
"""Convert polar coordinates to cartesian coordinates.
Polar coordinates are taken as (ra, dec, parallax) in (degree, degree, mas)
in ICRS system and are transformed to (x, y, z) in parsecs in ICRS system.
Parameters
----------
coords : Union[Vector3, Vector3Array]
polar coordinates in (ra, dec, parallax) in (degree, degree, mas)
in ICRS system.
Returns
-------
Union[Vector3, Vector3Array]
cartesian coordinates in (x, y, z) in parsecs in ICRS system.
"""
coords = np.array(coords)
if len(coords.shape) == 1:
coords = SkyCoord(
ra=coords[0] * u.degree,
dec=coords[1] * u.degree,
distance=Distance(parallax=coords[2] * u.mas),
representation_type="spherical",
frame="icrs",
)
coords.representation_type = "cartesian"
return np.array([coords.x.value, coords.y.value, coords.z.value])
else:
coords = SkyCoord(
ra=coords[:, 0] * u.degree,
dec=coords[:, 1] * u.degree,
distance=Distance(parallax=coords[:, 2] * u.mas),
representation_type="spherical",
frame="icrs",
)
coords.representation_type = "cartesian"
return np.vstack((coords.x.value, coords.y.value, coords.z.value)).T
# Custom validators
def _in_range(min_value, max_value):
def range_validator(instance, attribute, value):
if value < float(min_value):
raise ValueError(f"{attribute.name} attribute must be >= than {min_value}")
if value > float(max_value):
raise ValueError(f"{attribute.name} attribute must be <= than {max_value}")
return range_validator
def _dist_has_n_dimensions(n: int):
def _dist_has_n_dimensions_validator(instance, attribute, value):
if value.dim != n:
raise ValueError(
f"{attribute.name} attribute must have {n} dimensions, "
f" but has {value.dim} dimensions"
)
return _dist_has_n_dimensions_validator
# Custom distributions
[docs]@define(init=False)
class EDSD(stats.rv_continuous):
"""Class to represent the EDSD distribution.
Attributes
----------
w0 : float
Distribution zero point that indicates the lower limit
wl : float
Parameter that determines the width and the peak at wl/4
of the profile
wf : float
Distribution final point that indicates the upper limit
Returns
-------
EDSD:
Distribution object
Raises
------
ValueError
If ``wf < w0``, or if a and b, which determine de evaluation
domain in scipy.rv_continuous, do not verify ``a < w0``, ``b > wf``
and ``b > a``.
Notes
-----
Exponentially Decreasing Space Density is used to represent
certain distributions, such as a parallax distribution of a
star catalogue [1]_ [2]_. The EDSD distribution is defined as:
* ``f(w) = wl**3 / 2*(w-w0)**4 * exp(-wl/(w-w0))`` if ``w > w0`` and ``w < wf``
* ``f(w) = 0`` if ``w <= w0``
* ``f(w) = 0`` if ``w >= wf``
where:
* ``w``: parallax in mas
* ``w0``: distribution zero point that indicates the lower limit
* ``wl``: parameter that determines the width and the peak at wl/4
of the profile
* ``wf``: distribution final point that indicates the upper limit
from which the distribution is zero. This is added to make
the function domain limited to ``[w0, wf]``, so other
values outside this range are not evaluated.
References
----------
.. [1] C. A. L. Bailer-Jones et al. (2018). Estimating Distance from Parallaxes,
IV, Distances to 1.33 Billion Stars in Gaia Data Release 2. The Astronomical
Journal, 156:58 (11pp), 2018 August. https://doi.org/10.3847/1538-3881/aacb21
.. [2] Z. Shao & L. Li (2019). Gaia Parallax of Milky Way
Globular Clusters: A Solution
of Mixture Model. https://www.researchgate.net/publication/335233416
Examples
--------
.. literalinclude :: ../../examples/synthetic/edsd.py
:language: python
:linenos:
.. image:: ../../examples/synthetic/edsd.png
"""
w0: float
wl: float
wf: float
def __init__(self, w0: float, wl: float, wf: float, **kwargs):
super().__init__(**kwargs)
self._argcheck(w0, wl, wf)
self.w0 = w0
self.wl = wl
self.wf = wf
def _pdf(self, x, wl, w0, wf):
return np.piecewise(
x,
[(x <= w0) + (x >= wf)],
[0, lambda x: wl**3 / (2 * (x - w0)) ** 4 * np.exp(-wl / (x - w0))],
)
def _ppf(self, y, wl, w0, wf):
return (
(-0.007 * wl * 2 + 0.087 * wl - 0.12)
+ w0
+ (0.17 + 0.076 * wl) * y**4
+ (2 - 0.037 * wl + 0.0048 * wl**2)
* wl
/ (((np.log10(-5 / 4 * np.log10(1.0 - y)) - 3) ** 2) - 6)
)
def _cdf(self, x, wl, w0, wf):
return np.piecewise(
x,
[x <= w0, x >= wf],
[
0,
1,
lambda x: (
(
2 * x**2
+ (2 * wl - 4 * w0) * x
+ 2 * w0**2
- 2 * wl * w0
+ wl**2
)
* np.exp(-wl / (x - w0))
/ (2 * (x - w0) ** 2)
),
],
)
[docs] def pdf(self, x: Union[Number, Numeric1DArray]) -> Numeric1DArray:
"""Probability density function of the EDSD distribution.
Parameters
----------
x : Union[Number, Numeric1DArray]
Data to be evaluated.
Returns
-------
Numeric1DArray
PDF values.
Notes
-----
The PDF is defined as the density profile given in [1]_ [2]_, but it is
not used for random generation. Instead, a Percent Point Function
approximation is used.
"""
return self._pdf(x, self.wl, self.w0, self.wf)
[docs] def cdf(self, x: Union[Number, Numeric1DArray]) -> Numeric1DArray:
"""Cumulative distribution function.
Parameters
----------
x : Numeric1DArray
Returns
-------
Numeric1DArray
Cumulative distribution function.
Notes
-----
The CDF is a polinomial approximation of the real CDF
function.
"""
return self._cdf(x, self.wl, self.w0, self.wf)
[docs] def ppf(self, y: Union[Number, Numeric1DArray]) -> Numeric1DArray:
"""Percent point function.
Parameters
----------
y : Numeric1DArray
Returns
-------
Numeric1DArray
Percent point function.
Notes
-----
The PPF is a polinomial approximation of the real PPF. As cdf and
ppf are approximations, one is close to the inverse of the other,
but not exactly.
"""
return self._ppf(y, self.wl, self.w0, self.wf)
@beartype
def _argcheck(self, wl: Number, w0: Number, wf: Number):
if not (w0 < wf):
raise ValueError("w0 must be < than wf")
if self.a:
if self.b and (self.a > self.b) or np.isclose(self.a, self.b):
raise ValueError("a must be < than b")
if self.a > wf or np.isclose(self.a, wf):
raise ValueError("a must be < than wf")
return True
[docs] def rvs(self, size: int = 1) -> Numeric1DArray:
"""Generate random variates from the EDSD distribution.
Parameters
----------
size : int, optional
Size of the sample, by default 1
Returns
-------
Numeric1DArray
Generated sample.
"""
wl, w0, wf = self.wl, self.w0, self.wf
self._argcheck(wl, w0, wf)
limits = np.array([max(w0 + 1e-10, self.a), min(wf - 1e-10, self.b)]).astype(
"float64"
)
rv_limits = self._cdf(limits, wl, w0, wf)
sample = np.array([])
while sample.shape[0] < size:
y = np.random.uniform(low=rv_limits[0], high=rv_limits[1], size=size)
new_sample = self._ppf(y, wl, w0, wf)
new_sample = new_sample[
(new_sample >= limits[0]) & (new_sample <= limits[1])
]
sample = np.concatenate((sample, new_sample), axis=0)
return sample[:size]
# Data generators
[docs]@define
class StarCluster:
"""Class for generating astrometric data from a star cluster.
Attributes
----------
space : stats._multivariate.multi_rv_frozen
Space distribution of the cluster. It must have 3 dimensions.
pm : stats._multivariate.multi_rv_frozen
Proper motion distribution of the cluster. It must have 3 dimensions.
representation_type : str
Type of representation of the cluster. It must be "spherical" or "cartesian",
by default "spherical".
Returns
-------
StarCluster :
An instance of the StarCluster class.
"""
space: stats._multivariate.multi_rv_frozen = field(
validator=[
_type(stats._multivariate.multi_rv_frozen),
_dist_has_n_dimensions(n=3),
]
)
pm: stats._multivariate.multi_rv_frozen = field(
validator=[
_type(stats._multivariate.multi_rv_frozen),
_dist_has_n_dimensions(n=2),
]
)
representation_type: str = field(
validator=[_type(str), validators.in_(["cartesian", "spherical"])],
default="spherical",
)
n_stars: int = field(validator=[_type(int), _in_range(0, "inf")], default=200)
# TODO: fails when n is 1
[docs] def rvs(self) -> pd.DataFrame:
"""Generate random sample from the Star Cluster distribution.
Returns
-------
pd.DataFrame
Data frame with the samples.
"""
size = self.n_stars
data = pd.DataFrame()
xyz = np.atleast_2d(self.space.rvs(size))
if self.representation_type == "spherical":
data["ra"], data["dec"], data["parallax"] = cartesian_to_polar(xyz).T
else:
data[["x", "y", "z"]] = pd.DataFrame(xyz)
pm = np.atleast_2d(self.pm.rvs(size))
data[["pmra", "pmdec"]] = pd.DataFrame(pm)
return data
[docs] def pdf(self, data: pd.DataFrame) -> Numeric1DArray:
"""Joint probability density function of the StarCluster distribution.
Parameters
----------
data : pd.DataFrame
Data to be evaluated. Must contain the columns "ra", "dec", "parallax"
or "x", "y", "z", and "pmra", "pmdec".
Returns
-------
Numeric1DArray
PDF values for the data.
"""
pm_pdf = self.pm.pdf(data[["pmra", "pmdec"]].to_numpy())
if set(["x", "y", "z"]).issubset(set(data.columns)):
space_pdf = self.space.pdf(data[["x", "y", "z"]].to_numpy())
else:
xyz = polar_to_cartesian(data["ra", "dec", "parallax"].to_numpy())
space_pdf = self.space.pdf(xyz)
return pm_pdf * space_pdf
[docs] def pm_pdf(self, data: pd.DataFrame) -> Numeric1DArray:
"""Probability density function of the Proper Motion distribution.
Parameters
----------
data : pd.DataFrame
Data to be evaluated.
Returns
-------
Numeric1DArray
PDF values for the data.
"""
return self.pm.pdf(data[["pmra", "pmdec"]].to_numpy())
[docs] def space_pdf(self, data: pd.DataFrame) -> Numeric1DArray:
"""Probability density function of the space distribution.
Parameters
----------
data : pd.DataFrame
Data to be evaluated. Must have columns "x", "y", "z" or
"ra", "dec", "parallax".
Returns
-------
Numeric1DArray
PDF values for the data.
"""
if set(["x", "y", "z"]).issubset(set(data.columns)):
space_pdf = self.space.pdf(data[["x", "y", "z"]].to_numpy())
else:
xyz = polar_to_cartesian(data["ra", "dec", "parallax"].to_numpy())
space_pdf = self.space.pdf(xyz)
return space_pdf
[docs]@define
class StarField:
"""Class for generating star field data.
Attributes
----------
space : stats._multivariate.multi_rv_frozen
Distribution of the space coordinates. Must have 3 dimensions.
pm : stats._multivariate.multi_rv_frozen
Distribution of the proper motion coordinates. Must have 2 dimensions.
representation_type : str
Type of representation of the spatial data. Must be "cartesian" or "spherical",
by default "spherical".
Returns
-------
StarField :
An instance of the StarField class.
"""
space: stats._multivariate.multi_rv_frozen = field(
validator=[
_type(stats._multivariate.multi_rv_frozen),
_dist_has_n_dimensions(n=3),
]
)
pm: stats._multivariate.multi_rv_frozen = field(
validator=[
_type(stats._multivariate.multi_rv_frozen),
_dist_has_n_dimensions(n=2),
]
)
representation_type: str = field(
validator=[_type(str), validators.in_(["cartesian", "spherical"])],
default="spherical",
)
n_stars: int = field(validator=[_type(int), _in_range(0, "inf")], default=int(1e5))
# TODO: test
[docs] def rvs(self) -> pd.DataFrame:
"""Generate random sample from the Star Field distribution.
Returns
-------
pd.DataFrame
Contains columns for the space distrubution and the pm distribution.
"""
size = self.n_stars
data = pd.DataFrame()
xyz = np.atleast_2d(self.space.rvs(size))
pm = np.atleast_2d(self.pm.rvs(size))
data[["pmra", "pmdec"]] = pd.DataFrame(np.vstack((pm[:, 0], pm[:, 1])).T)
if self.representation_type == "spherical":
ra_dec_plx = cartesian_to_polar(xyz)
data[["ra", "dec", "parallax"]] = pd.DataFrame(ra_dec_plx)
else:
data[["x", "y", "z"]] = pd.DataFrame(xyz)
return data
# TODO: test
[docs] def pdf(self, data: pd.DataFrame) -> Numeric1DArray:
"""Joint Probability Density Function of the Star Field.
Parameters
----------
data : pd.DataFrame
Data to be evaluated.
Returns
-------
Numeric1DArray
PDF values for the data.
"""
pm_pdf = self.pm.pdf(data[["pmra", "pmdec"]].to_numpy())
if set(["x", "y", "z"]).issubset(set(data.columns)):
space_pdf = self.space.pdf(data[["x", "y", "z"]].to_numpy())
else:
xyz = polar_to_cartesian(data["ra", "dec", "parallax"].to_numpy())
space_pdf = self.space.pdf(xyz)
return pm_pdf * space_pdf
[docs] def pm_pdf(self, data: pd.DataFrame) -> Numeric1DArray:
"""Probability density function of the Proper Motion distribution.
Parameters
----------
data : pd.DataFrame
Data to be evaluated. Must have columns "pmra" and "pmdec".
Returns
-------
Numeric1DArray
PDF of the Proper Motion distribution.
"""
return self.pm.pdf(data[["pmra", "pmdec"]].to_numpy())
[docs] def space_pdf(self, data: pd.DataFrame) -> Numeric1DArray:
"""Probability density function of the space distribution.
Parameters
----------
data : pd.DataFrame
Data to be evaluated. Must have columns "x", "y", "z"
or "ra", "dec", "parallax".
Returns
-------
Numeric1DArray
PDF of the space distribution.
"""
if set(["x", "y", "z"]).issubset(set(data.columns)):
space_pdf = self.space.pdf(data[["x", "y", "z"]].to_numpy())
else:
xyz = polar_to_cartesian(data["ra", "dec", "parallax"].to_numpy())
space_pdf = self.space.pdf(xyz)
return space_pdf
[docs]@define
class Synthetic:
"""Class for generating synthetic data.
Attributes
----------
space : stats._multivariate.multi_rv_frozen
The space distribution, it must be a 3d distribution.
pm : stats._multivariate.multi_rv_frozen
The proper motion distribution, it must be a 2d distribution.
representation_type : str
The representation type, it must be "cartesian" or "spherical",
by default is "spherical".
Examples
--------
.. literalinclude:: ../../examples/synthetic/synthetic.py
:language: python
:linenos:
.. image:: ../../examples/synthetic/synthetic.png
"""
star_field: StarField = field(validator=_type(StarField))
representation_type: str = field(
validator=[_type(str), validators.in_(["cartesian", "spherical"])],
default="spherical",
)
clusters: List[StarCluster] = Factory(list)
# TODO: test
[docs] def rvs(self) -> pd.DataFrame:
"""Generate synthetic data from field and cluster distributions.
Returns
-------
pd.DataFrame
synthetic data with columns for the space distribution, the proper motion
distribution and approximations of the membership probabilities without
taking into account any errors.
"""
self.star_field.representation_type = "cartesian"
data = self.star_field.rvs()
for i in range(len(self.clusters)):
self.clusters[i].representation_type = "cartesian"
cluster_data = self.clusters[i].rvs()
data = pd.concat([data, cluster_data], axis=0)
# TODO: improve
total_stars = sum([c.n_stars for c in self.clusters]) + self.star_field.n_stars
field_mixing_ratio = float(self.star_field.n_stars) / float(total_stars)
field_p = self.star_field.pdf(data) * field_mixing_ratio
field_pmp = self.star_field.pm_pdf(data) * field_mixing_ratio
field_spacep = self.star_field.space_pdf(data) * field_mixing_ratio
clusters_mixing_ratios = [
float(c.n_stars) / float(total_stars) for c in self.clusters
]
cluster_ps = np.array(
[
self.clusters[i].pdf(data) * clusters_mixing_ratios[i]
for i in range(len(self.clusters))
]
)
cluster_pmps = np.array(
[
self.clusters[i].pm_pdf(data) * clusters_mixing_ratios[i]
for i in range(len(self.clusters))
]
)
cluster_spaceps = np.array(
[
self.clusters[i].space_pdf(data) * clusters_mixing_ratios[i]
for i in range(len(self.clusters))
]
)
total_p = cluster_ps.sum(axis=0) + field_p
total_pmp = cluster_pmps.sum(axis=0) + field_pmp
total_spacep = cluster_spaceps.sum(axis=0) + field_spacep
total_clusters_probs = 0
total_clusters_pmprobs = 0
total_clusters_spaceprobs = 0
for i in range(len(self.clusters)):
data[f"p_cluster{i+1}"] = cluster_ps[i] / total_p
total_clusters_probs += cluster_ps[i] / total_p
data[f"p_pm_cluster{i+1}"] = cluster_pmps[i] / total_pmp
total_clusters_pmprobs += cluster_pmps[i] / total_pmp
data[f"p_space_cluster{i+1}"] = cluster_spaceps[i] / total_spacep
total_clusters_spaceprobs += cluster_spaceps[i] / total_spacep
data["p_field"] = 1 - total_clusters_probs
data["p_pm_field"] = 1 - total_clusters_pmprobs
data["p_space_field"] = 1 - total_clusters_spaceprobs
if self.representation_type == "spherical":
xyz = data[["x", "y", "z"]].to_numpy()
data["ra"], data["dec"], data["parallax"] = cartesian_to_polar(xyz).T
data["log10_parallax"] = np.log10(data["parallax"])
data.drop(["x", "y", "z"], inplace=True, axis=1)
return data