Source code for raptorstats.scanline

import geopandas as gpd
import numpy as np

# Debugging code
# End Debugging code
import rasterio as rio
import shapely
from shapely import LineString, MultiLineString

from raptorstats.stats import Stats
from raptorstats.zonal_stat_method import ZonalStatMethod


[docs] def xy_to_rowcol(xs, ys, transform): cols, rows = (~transform) * (xs, ys) rows = np.floor(rows).astype(int).clip(min=0) cols = np.ceil(cols - 0.5).astype(int).clip(min=0) # left-inclusive # cols = np.round(cols).astype(int).clip(min=0) return rows, cols
[docs] def rows_to_ys(rows, transform): return (transform * (0, rows + 0.5))[1]
[docs] def build_intersection_table(features: gpd.GeoDataFrame, raster: rio.DatasetReader): """Intersect features with scanlines and return the intersection table. Parameters ---------- features : gpd.GeoDataFrame raster : rio.DatasetReader Returns ------- (np.ndarray(int), np.ndarray(float)) f_index, (y, x0, x1) coordinates Raises ------ TypeError If the intersection type is not LineString or MultiLineString """ window = rio.features.geometry_window( raster, features.geometry, # transform=transform, pad_x=0.5, pad_y=0.5, ) row_start, row_end = int(np.floor(window.row_off)), int( np.ceil(window.row_off + window.height) ) col_start, col_end = int(np.floor(window.col_off)), int( np.ceil(window.col_off + window.width) ) x0 = (raster.transform * (col_start - 1, 0))[0] x1 = (raster.transform * (col_end, 0))[0] all_rows = np.arange(row_start, row_end + 1) ys = rows_to_ys(all_rows, raster.transform) x0s = np.full_like(ys, x0, dtype=float) x1s = np.full_like(ys, x1, dtype=float) # all points defining the scanlines all_points = np.stack( [np.stack([x0s, ys], axis=1), np.stack([x1s, ys], axis=1)], axis=1 ) # for g in features.geometry: # shapely.prepare(g) scanlines = MultiLineString(list(all_points)) # shapely.prepare(scanlines) all_intersections = scanlines.intersection(features.geometry) intersection_table = [] for f_index, inter in enumerate(all_intersections): if inter.is_empty: continue if isinstance(inter, MultiLineString): geoms = inter.geoms elif isinstance(inter, LineString): geoms = [inter] else: raise TypeError(f"Unexpected intersection type: {type(inter)}") # g_arr = np.asarray(geoms, dtype=object) g_arr = shapely.get_parts(geoms) starts = shapely.get_point(g_arr, 0) ends = shapely.get_point(g_arr, -1) x0s = shapely.get_x(starts) x1s = shapely.get_x(ends) ys = shapely.get_y(starts) # vertically stack the intersections # creating a numpy array of shape (n, 4) # with f_index, y, x0, x1 intersection_table.extend( zip(np.full_like(ys, f_index, dtype=int), ys, x0s, x1s) ) if not intersection_table: return np.array([], dtype=int), np.array([], dtype=float) intersection_table = np.array(intersection_table) f_index = intersection_table[:, 0].astype(int) coords = intersection_table[:, 1:4] return f_index, coords
[docs] def build_reading_table( f_index, intersection_coords, raster: rio.DatasetReader, return_coordinates=False, sort_by_feature=False, ): """Create a reading table indicating which pixels to read for each feature. Parameters ---------- intersection_table : np.ndarray A 2D array containing feature index, y coordinate, x0 coordinate, and x1 coordinate raster : rio.DatasetReader A rasterio dataset reader object return_coordinates : bool, optional Whether to return the coordinates of the pixels, by default False sort_by_feature : bool, optional Whether to sort the reading table by feature index and then by row, instead of just by row, by default False Returns ------- np.ndarray or tuple of np.ndarray (row, col0, col1, f_index), (y, x0, x1) if return_coordinates is True """ inter_ys = intersection_coords[:, 0] inter_x0s = intersection_coords[:, 1] inter_x1s = intersection_coords[:, 2] rows, col0s = xy_to_rowcol(inter_x0s, inter_ys, raster.transform) _, col1s = xy_to_rowcol(inter_x1s, inter_ys, raster.transform) reading_table = np.stack([rows, col0s, col1s, f_index], axis=1).astype(int) reading_table = reading_table[ col0s < col1s ] # removes pixel reads where both intersections fall in between pixel centers if sort_by_feature: # sort by feature first then by row sort_idx = np.lexsort((reading_table[:, 0], reading_table[:, 3])) else: # sort just by row sort_idx = np.argsort(reading_table[:, 0]) if return_coordinates: coor = np.stack([inter_ys, inter_x0s, inter_x1s], axis=1)[ sort_idx ] # they are separate cause reading table is integer and coordinates are float return reading_table[sort_idx], coor return reading_table[sort_idx]
[docs] def process_reading_table( reading_table: np.ndarray, features: gpd.GeoDataFrame, raster: rio.DatasetReader, stats: Stats, partials=None, max_collected_rows_percentage=10, ): """Read the pixels indicated by the reading table and compute statistics. Parameters ---------- reading_table : np.ndarray A 2D array containing the reading table information features : gpd.GeoDataFrame A GeoDataFrame containing the features raster : rio.DatasetReader A rasterio dataset reader object stats : Stats A Stats object for computing statistics partials : list, optional A list of partial per-feature statistics to combine with the computed statistics, by default None Returns ------- List[Dict] A list of statistics for each feature """ def compute_partials( pixel_values_per_feature: list, results_per_feature: list, stats: Stats ): """Compute partials and append to the results_per_feature list. Both lists are n_features long. Assumes pixel_values_per_feature is a list of lists, and it's ordered by feature index. """ for i in range(len(pixel_values_per_feature)): if not pixel_values_per_feature[i]: feature_data = np.ma.array([], mask=True) else: feature_data = np.ma.concatenate(pixel_values_per_feature[i]) # get the stats r = stats.from_array(feature_data) results_per_feature[i].append(r) return results_per_feature rows, row_starts = np.unique(reading_table[:, 0], return_index=True) pixel_values_per_feature = [[] for _ in range(len(features.geometry))] results_per_feature = [[] for _ in range(len(features.geometry))] max_rows = max_collected_rows_percentage / 100 * len(rows) row_count = 0 for i, row in enumerate(rows): start = row_starts[i] end = row_starts[i + 1] if i + 1 < len(row_starts) else len(reading_table) reading_line = reading_table[start:end] min_col = np.min(reading_line[:, 1]) max_col = np.max(reading_line[:, 2]) reading_window = rio.windows.Window( col_off=min_col, row_off=row, width=max_col - min_col, height=1 ) # Does not handle nodata data = raster.read(1, window=reading_window, masked=True) if data.shape[0] == 0: continue data = data[0] for j, col0, col1, f_index in reading_line: c0 = col0 - min_col c1 = col1 - min_col pixel_values = data[c0:c1] if len(pixel_values) > 0: pixel_values_per_feature[f_index].append(pixel_values) # This is a chunking mechanism to avoid memory issues # for large datasets if row_count >= max_rows: results_per_feature = compute_partials( pixel_values_per_feature, results_per_feature, stats ) # reset the pixel values per feature pixel_values_per_feature = [[] for _ in range(len(features.geometry))] # reset the row count row_count = 0 else: row_count += 1 # If there are still pixel values left, compute the partials if pixel_values_per_feature: results_per_feature = compute_partials( pixel_values_per_feature, results_per_feature, stats ) final_results = [] for i in range(len(results_per_feature)): if partials is not None and partials[i]: # If partials are provided, combine them with the results results_per_feature[i].extend(partials[i]) r = stats.from_partials(results_per_feature[i]) final_results.append(r) return final_results
[docs] class Scanline(ZonalStatMethod): __name__ = "Scanline" def __init__(self): super().__init__() def _precomputations(self, features: gpd.GeoDataFrame, raster: rio.DatasetReader): # NOTE: ASSUMES NORTH UP, NO SHEAR AFFINE. ASSUMES GEOMETRIES ARE POLYGONS OR MULTIPOLYGONS (NO POINTS, MULTIPOINTS, LINES) f_index, intersection_coords = build_intersection_table(features, raster) if not intersection_coords.size: self.results = [ self.stats.from_array(np.ma.array([], mask=True)) for _ in features.geometry ] return reading_table = build_reading_table( f_index, intersection_coords, raster, return_coordinates=False, sort_by_feature=False, ) self.results = process_reading_table( reading_table, features, raster, self.stats ) # Debugging code # window = rio.features.geometry_window( # raster, # features.geometry, # # transform=transform, # pad_x=0.5, # pad_y=0.5, # ) # w_height = int(np.ceil(window.height)) # w_width = int(np.ceil(window.width)) # global_mask = np.zeros((w_height+1, w_width), dtype=int) # rows, row_starts = np.unique(reading_table[:, 0], return_index=True) # inter_ys = intersection_coords[:, 0] # inter_x0s = intersection_coords[:, 1] # inter_x1s = intersection_coords[:, 2] # row_start, row_end = int(np.floor(window.row_off)), int( # np.ceil(window.row_off + window.height) # ) # col_start, col_end = int(np.floor(window.col_off)), int( # np.ceil(window.col_off + window.width) # ) # diffs = compare_stats(self.results, self.raster.files[0], features, stats=self.stats, show_diff=True, precision=5) # # diffs = [diffs[0]] # diff_indices = [d['feature'] for d in diffs] # diff_features = features.iloc[diff_indices] # f_index_to_diff_features = { f: i for i, f in enumerate(diff_indices) } # for i, row in enumerate(rows): # start = row_starts[i] # end = row_starts[i + 1] if i + 1 < len(row_starts) else len(reading_table) # reading_line = reading_table[start:end] # min_col = np.min(reading_line[:, 1]) # max_col = np.max(reading_line[:, 2]) # reading_window = rio.windows.Window( # col_off=min_col, row_off=row, width=max_col - min_col, height=1 # ) # # Does not handle nodata # data = raster.read(1, window=reading_window, masked=True) # if data.shape[0] == 0: # continue # data = data[0] # for j, col0, col1, f_index in reading_line: # # Debugging code # # mark the pixels in the global mask # row_in_mask = row - row_start # col0_in_mask = col0 - col_start # col1_in_mask = col1 - col_start # if len(diff_indices) > 0: # if f_index in diff_indices: # global_mask[row_in_mask, col0_in_mask:col1_in_mask] = f_index_to_diff_features[f_index] + 1 # else: # global_mask[row_in_mask, col0_in_mask:col1_in_mask] = f_index + 1 # global_mask = global_mask[:-1, :] # remove the last row added for debugging # ref_mask = ref_mask_rasterstats(diff_features if len(diff_indices) > 0 else features, raster, window) # print(diffs) # plot_mask_comparison(global_mask, ref_mask, diff_features if len(diff_indices) > 0 else features, raster.transform, window=window, scanlines=inter_ys) # print('done') # End Debugging code def _run(self, features: gpd.GeoDataFrame, raster: rio.DatasetReader): self._precomputations(features, raster) return self.results