Source code for raptorstats.stats

import collections

import numpy as np

DEFAULT_STATS = ["count", "min", "max", "mean"]

VALID_STATS = DEFAULT_STATS + [
    "sum",
    "std",
    "median",
    "majority",
    "minority",
    "unique",
    "range",
    "nodata",
    "nan",
]


[docs] class Stats: """Configuration for statistics computation. Stores the statistics to be computed and has methods for computing them from arrays or partial results. """ def __init__(self, stats=None, categorical=False): if not stats: stats = [] if categorical else DEFAULT_STATS elif isinstance(stats, str): if stats in ["*", "ALL"]: stats = VALID_STATS else: stats = stats.split() percentiles = [] for token in stats: if token.startswith("percentile_"): percentiles.append(self._get_percentile(token)) elif token not in VALID_STATS: raise ValueError( "Stat `%s` not valid; " "must be one of \n %r" % (token, VALID_STATS) ) requested = set(stats) required = set(stats) required.update({"count"}) # count is always useful if {"mean", "std", "median"} & required or percentiles: required.update({"count", "sum"}) if "std" in required: required.update({"mean"}) if "range" in required: required.update({"min", "max"}) run_count = ( categorical or percentiles or required & {"majority", "minority", "unique", "median"} ) ordered = [] for tok in stats + VALID_STATS: if tok in required and tok not in ordered: ordered.append(tok) required.remove(tok) for tok in required: ordered.append(tok) self.requested = requested self.stats = ordered self.categorical = categorical self.run_count = run_count self.percentiles = percentiles
[docs] def clean_results(self, results): for res in results: if not res: continue for k in list(res.keys()): if self.categorical and k == "histogram": continue if k not in self.requested: del res[k] return results
def _get_percentile(self, stat): if not stat.startswith("percentile_"): raise ValueError("must start with 'percentile_'") qstr = stat.replace("percentile_", "") q = float(qstr) if q > 100.0: raise ValueError("percentiles must be <= 100") if q < 0.0: raise ValueError("percentiles must be >= 0") return q def __repr__(self): return f"StatsConfig(stats={self.stats}, categorical={self.categorical}, run_count={self.run_count})"
[docs] def empty(self): """Return an empty stats dict with the requested keys.""" st = {s: np.nan for s in self.stats} if self.stats else {} st["count"] = 0 if self.categorical: st["histogram"] = {} if "nodata" in self.stats: st["nodata"] = 0 if "nan" in self.stats: st["nan"] = 0 return st
[docs] def from_array(self, data: np.ma.MaskedArray) -> dict: """Compute the configured statistics on *data*. Parameters ---------- data : np.ndarray or np.ma.MaskedArray Pixel values for one feature. Masked or NaN values are treated as nodata. Returns ------- dict Keys are the statistic names requested at construction time (plus every {value: count} entry if categorical=True). Values are floats or ints; empty inputs yield None. """ # ---------- normalise to plain ndarray + boolean mask ----------- if np.ma.isMaskedArray(data): mask = data.mask arr = data.data else: arr = np.asarray(data) mask = np.zeros(arr.shape, dtype=bool) nan_mask = np.isnan(arr) nodata_mask = mask | nan_mask valid = arr[~nodata_mask] out = {s: np.nan for s in self.stats} if "nodata" in self.stats: out["nodata"] = int(mask.sum()) if "nan" in self.stats: out["nan"] = int(nan_mask.sum()) n = valid.size if "count" in self.stats: out["count"] = int(n) if n: # Basic stats if "sum" in self.stats: out["sum"] = float(valid.sum()) if "min" in self.stats: out["min"] = float(valid.min()) if "max" in self.stats: out["max"] = float(valid.max()) if "mean" in self.stats: out["mean"] = float(valid.mean()) if "range" in self.stats: out["range"] = float(valid.max() - valid.min()) # if np.allclose(out["mean"], 229.543): # print("stop") if "std" in self.stats: out["std"] = float(np.std(valid, ddof=0)) if "median" in self.stats: out["median"] = float(np.median(valid)) # Percentiles for tok in self.percentiles: out["percentile_" + str(int(tok))] = float(np.percentile(valid, tok)) # Histogram / Categorical stats if self.run_count: vals, counts = np.unique(valid, return_counts=True) counter = dict(zip(vals.tolist(), counts.tolist())) if "unique" in self.stats: out["unique"] = len(counter) if "majority" in self.stats and counter: out["majority"] = float(max(counter, key=counter.get)) if "minority" in self.stats and counter: out["minority"] = float(min(counter, key=counter.get)) out["histogram"] = counter elif self.run_count: out["histogram"] = {} return out
# ------------------------------------------------------------------ #
[docs] def from_partials(self, partials: list[dict]) -> dict: """Combine already-computed chunk statistics into one feature-level dict. Each element of *partials* must have been produced by `from_array()` with **the same Stats configuration**. Returns ------- dict Feature-level statistics – exactly the same schema you get from calling `from_array()` on the full dataset at once. """ # discard empty / None chunks chunks = [p for p in partials if p] # nothing at all if not chunks: out = {stat: np.nan for stat in self.stats} out["count"] = 0 return out if len(chunks) == 1: return chunks[0].copy() # helpers to accumulate total_count = 0 total_sum = 0.0 total_mean = 0.0 total_M2 = 0.0 total_nodata = 0 total_nan = 0 global_min = np.inf global_max = -np.inf histogram = collections.Counter() for r in chunks: # counts n = r.get("count", 0) if not n: total_nodata += r.get("nodata", 0) or 0 total_nan += r.get("nan", 0) or 0 continue total_nodata += r.get("nodata", 0) or 0 total_nan += r.get("nan", 0) or 0 # sum / mean s = r.get("sum") if s is None: s = 0.0 m = r.get("mean") if m is None or (isinstance(m, float) and np.isnan(m)): m = (s / n) if n else 0.0 # fallback if mean missing # std -> M2 std = r.get("std") if std is None or (isinstance(std, float) and np.isnan(std)): M2 = 0.0 else: # allow true zero std to pass through M2 = float(std) ** 2 * n # combine (Welford pairwise) delta = m - total_mean new_count = total_count + n total_M2 += M2 + (delta * delta) * (total_count * n) / new_count total_mean += delta * n / new_count total_count = new_count total_sum += s # min/max vmin = r.get("min") if vmin is not None and not np.isnan(vmin): global_min = min(global_min, vmin) vmax = r.get("max") if vmax is not None and not np.isnan(vmax): global_max = max(global_max, vmax) # histogram h = r.get("histogram") if h: histogram.update(h) out = {stat: np.nan for stat in self.stats} if "nodata" in self.stats: out["nodata"] = int(total_nodata) if "nan" in self.stats: out["nan"] = int(total_nan) if "count" in self.stats: out["count"] = int(total_count) if total_count: if "sum" in self.stats: out["sum"] = float(total_sum) if "min" in self.stats: out["min"] = float(global_min) if "max" in self.stats: out["max"] = float(global_max) if "mean" in self.stats: out["mean"] = float(total_sum / total_count) # if np.allclose(out["mean"], 229.543): # print("stop") if "range" in self.stats: out["range"] = float(global_max - global_min) if "std" in self.stats: out["std"] = float(np.sqrt(total_M2 / total_count)) if "median" in self.stats or self.percentiles: # build cumulative distribution vals, cnts = zip(*sorted(histogram.items())) # cum = np.cumsum(cnts) def _quantile(vals, cnts, q): vals = np.asarray(vals, float) cnts = np.asarray(cnts, int) order = np.argsort(vals) vals, cnts = vals[order], cnts[order] cum = np.cumsum(cnts) N = cum[-1] # NumPy-like "linear" between order stats k = (N - 1) * q j = np.floor(k).astype(int) g = k - j def kth(k0): return vals[np.searchsorted(cum, k0 + 1, side="left")] vj = np.vectorize(kth)(j) vj1 = np.vectorize(kth)(np.minimum(j + 1, N - 1)) return (1 - g) * vj + g * vj1 if "median" in self.stats: out["median"] = _quantile(vals=vals, cnts=cnts, q=0.5) for q in self.percentiles: out[f"percentile_{int(q)}"] = _quantile( vals=vals, cnts=cnts, q=q / 100.0 ) if self.run_count: if "unique" in self.stats: out["unique"] = len(histogram) if "majority" in self.stats: out["majority"] = float(max(histogram, key=histogram.get)) if "minority" in self.stats: out["minority"] = float(min(histogram, key=histogram.get)) out["histogram"] = dict(histogram) elif self.run_count: # empty input, but categorical requested out["histogram"] = dict(histogram) return out