import collections
import numpy as np
DEFAULT_STATS = ["count", "min", "max", "mean"]
VALID_STATS = DEFAULT_STATS + [
"sum",
"std",
"median",
"majority",
"minority",
"unique",
"range",
"nodata",
"nan",
]
[docs]
class Stats:
"""Configuration for statistics computation.
Stores the statistics to be computed and has methods for
computing them from arrays or partial results.
"""
def __init__(self, stats=None, categorical=False):
if not stats:
stats = [] if categorical else DEFAULT_STATS
elif isinstance(stats, str):
if stats in ["*", "ALL"]:
stats = VALID_STATS
else:
stats = stats.split()
percentiles = []
for token in stats:
if token.startswith("percentile_"):
percentiles.append(self._get_percentile(token))
elif token not in VALID_STATS:
raise ValueError(
"Stat `%s` not valid; "
"must be one of \n %r" % (token, VALID_STATS)
)
requested = set(stats)
required = set(stats)
required.update({"count"}) # count is always useful
if {"mean", "std", "median"} & required or percentiles:
required.update({"count", "sum"})
if "std" in required:
required.update({"mean"})
if "range" in required:
required.update({"min", "max"})
run_count = (
categorical
or percentiles
or required & {"majority", "minority", "unique", "median"}
)
ordered = []
for tok in stats + VALID_STATS:
if tok in required and tok not in ordered:
ordered.append(tok)
required.remove(tok)
for tok in required:
ordered.append(tok)
self.requested = requested
self.stats = ordered
self.categorical = categorical
self.run_count = run_count
self.percentiles = percentiles
[docs]
def clean_results(self, results):
for res in results:
if not res:
continue
for k in list(res.keys()):
if self.categorical and k == "histogram":
continue
if k not in self.requested:
del res[k]
return results
def _get_percentile(self, stat):
if not stat.startswith("percentile_"):
raise ValueError("must start with 'percentile_'")
qstr = stat.replace("percentile_", "")
q = float(qstr)
if q > 100.0:
raise ValueError("percentiles must be <= 100")
if q < 0.0:
raise ValueError("percentiles must be >= 0")
return q
def __repr__(self):
return f"StatsConfig(stats={self.stats}, categorical={self.categorical}, run_count={self.run_count})"
[docs]
def empty(self):
"""Return an empty stats dict with the requested keys."""
st = {s: np.nan for s in self.stats} if self.stats else {}
st["count"] = 0
if self.categorical:
st["histogram"] = {}
if "nodata" in self.stats:
st["nodata"] = 0
if "nan" in self.stats:
st["nan"] = 0
return st
[docs]
def from_array(self, data: np.ma.MaskedArray) -> dict:
"""Compute the configured statistics on *data*.
Parameters
----------
data : np.ndarray or np.ma.MaskedArray
Pixel values for one feature. Masked or NaN values are treated
as nodata.
Returns
-------
dict
Keys are the statistic names requested at construction time
(plus every {value: count} entry if categorical=True).
Values are floats or ints; empty inputs yield None.
"""
# ---------- normalise to plain ndarray + boolean mask -----------
if np.ma.isMaskedArray(data):
mask = data.mask
arr = data.data
else:
arr = np.asarray(data)
mask = np.zeros(arr.shape, dtype=bool)
nan_mask = np.isnan(arr)
nodata_mask = mask | nan_mask
valid = arr[~nodata_mask]
out = {s: np.nan for s in self.stats}
if "nodata" in self.stats:
out["nodata"] = int(mask.sum())
if "nan" in self.stats:
out["nan"] = int(nan_mask.sum())
n = valid.size
if "count" in self.stats:
out["count"] = int(n)
if n:
# Basic stats
if "sum" in self.stats:
out["sum"] = float(valid.sum())
if "min" in self.stats:
out["min"] = float(valid.min())
if "max" in self.stats:
out["max"] = float(valid.max())
if "mean" in self.stats:
out["mean"] = float(valid.mean())
if "range" in self.stats:
out["range"] = float(valid.max() - valid.min())
# if np.allclose(out["mean"], 229.543):
# print("stop")
if "std" in self.stats:
out["std"] = float(np.std(valid, ddof=0))
if "median" in self.stats:
out["median"] = float(np.median(valid))
# Percentiles
for tok in self.percentiles:
out["percentile_" + str(int(tok))] = float(np.percentile(valid, tok))
# Histogram / Categorical stats
if self.run_count:
vals, counts = np.unique(valid, return_counts=True)
counter = dict(zip(vals.tolist(), counts.tolist()))
if "unique" in self.stats:
out["unique"] = len(counter)
if "majority" in self.stats and counter:
out["majority"] = float(max(counter, key=counter.get))
if "minority" in self.stats and counter:
out["minority"] = float(min(counter, key=counter.get))
out["histogram"] = counter
elif self.run_count:
out["histogram"] = {}
return out
# ------------------------------------------------------------------ #
[docs]
def from_partials(self, partials: list[dict]) -> dict:
"""Combine already-computed chunk statistics into one feature-level dict.
Each element of *partials* must have been produced by `from_array()`
with **the same Stats configuration**.
Returns
-------
dict
Feature-level statistics – exactly the same schema you get from
calling `from_array()` on the full dataset at once.
"""
# discard empty / None chunks
chunks = [p for p in partials if p]
# nothing at all
if not chunks:
out = {stat: np.nan for stat in self.stats}
out["count"] = 0
return out
if len(chunks) == 1:
return chunks[0].copy()
# helpers to accumulate
total_count = 0
total_sum = 0.0
total_mean = 0.0
total_M2 = 0.0
total_nodata = 0
total_nan = 0
global_min = np.inf
global_max = -np.inf
histogram = collections.Counter()
for r in chunks:
# counts
n = r.get("count", 0)
if not n:
total_nodata += r.get("nodata", 0) or 0
total_nan += r.get("nan", 0) or 0
continue
total_nodata += r.get("nodata", 0) or 0
total_nan += r.get("nan", 0) or 0
# sum / mean
s = r.get("sum")
if s is None:
s = 0.0
m = r.get("mean")
if m is None or (isinstance(m, float) and np.isnan(m)):
m = (s / n) if n else 0.0 # fallback if mean missing
# std -> M2
std = r.get("std")
if std is None or (isinstance(std, float) and np.isnan(std)):
M2 = 0.0
else:
# allow true zero std to pass through
M2 = float(std) ** 2 * n
# combine (Welford pairwise)
delta = m - total_mean
new_count = total_count + n
total_M2 += M2 + (delta * delta) * (total_count * n) / new_count
total_mean += delta * n / new_count
total_count = new_count
total_sum += s
# min/max
vmin = r.get("min")
if vmin is not None and not np.isnan(vmin):
global_min = min(global_min, vmin)
vmax = r.get("max")
if vmax is not None and not np.isnan(vmax):
global_max = max(global_max, vmax)
# histogram
h = r.get("histogram")
if h:
histogram.update(h)
out = {stat: np.nan for stat in self.stats}
if "nodata" in self.stats:
out["nodata"] = int(total_nodata)
if "nan" in self.stats:
out["nan"] = int(total_nan)
if "count" in self.stats:
out["count"] = int(total_count)
if total_count:
if "sum" in self.stats:
out["sum"] = float(total_sum)
if "min" in self.stats:
out["min"] = float(global_min)
if "max" in self.stats:
out["max"] = float(global_max)
if "mean" in self.stats:
out["mean"] = float(total_sum / total_count)
# if np.allclose(out["mean"], 229.543):
# print("stop")
if "range" in self.stats:
out["range"] = float(global_max - global_min)
if "std" in self.stats:
out["std"] = float(np.sqrt(total_M2 / total_count))
if "median" in self.stats or self.percentiles:
# build cumulative distribution
vals, cnts = zip(*sorted(histogram.items()))
# cum = np.cumsum(cnts)
def _quantile(vals, cnts, q):
vals = np.asarray(vals, float)
cnts = np.asarray(cnts, int)
order = np.argsort(vals)
vals, cnts = vals[order], cnts[order]
cum = np.cumsum(cnts)
N = cum[-1]
# NumPy-like "linear" between order stats
k = (N - 1) * q
j = np.floor(k).astype(int)
g = k - j
def kth(k0):
return vals[np.searchsorted(cum, k0 + 1, side="left")]
vj = np.vectorize(kth)(j)
vj1 = np.vectorize(kth)(np.minimum(j + 1, N - 1))
return (1 - g) * vj + g * vj1
if "median" in self.stats:
out["median"] = _quantile(vals=vals, cnts=cnts, q=0.5)
for q in self.percentiles:
out[f"percentile_{int(q)}"] = _quantile(
vals=vals, cnts=cnts, q=q / 100.0
)
if self.run_count:
if "unique" in self.stats:
out["unique"] = len(histogram)
if "majority" in self.stats:
out["majority"] = float(max(histogram, key=histogram.get))
if "minority" in self.stats:
out["minority"] = float(min(histogram, key=histogram.get))
out["histogram"] = dict(histogram)
elif self.run_count:
# empty input, but categorical requested
out["histogram"] = dict(histogram)
return out