Source code for raptorstats.io

import json
import os
import warnings
from contextlib import contextmanager
from pathlib import Path
from typing import Sequence, Union

import geopandas as gpd
import numpy as np
import rasterio as rio
from affine import Affine
from rasterio.io import MemoryFile
from rasterio.vrt import WarpedVRT
from shapely import wkt
from shapely.geometry.base import BaseGeometry


[docs] def validate_raster_vector_compatibility( raster: rio.DatasetReader, vector: gpd.GeoDataFrame ): raster_crs = raster.crs vector_crs = vector.crs if not str(raster_crs) == str(vector_crs): warnings.warn( "Raster and vector CRS do not match. This may lead to incorrect results.", UserWarning, )
[docs] def validate_is_north_up(transform): not_north_up = ( not np.isclose(transform.b, 0) or not np.isclose(transform.d, 0) or transform.e > 0 ) if not_north_up: warnings.warn( "Raster transform is not north-up. This may lead to incorrect results.", UserWarning, )
[docs] @contextmanager def open_raster(source, *, affine=None, nodata=None, band=1, crs=None): """Yield a rasterio.DatasetReader no matter what the caller passes. Parameters ---------- source : str | Path | rasterio.io.DatasetReader | np.ndarray affine : Affine, required if `source` is a NumPy array nodata : float, optional override (ignored for open readers that already match) band : int specifying the bad to read crs : dict or str, CRS for in-memory arrays Yields ------ ds : rasterio.io.DatasetReaderBase """ memfile = None base_ds = None close_base = False try: # already opened reader case if isinstance(source, rio.io.DatasetReaderBase): base_ds = source close_base = False elif isinstance(source, (str, bytes, os.PathLike)): base_ds = rio.open(source, "r") close_base = True # in-memory DatasetReader for raw NumPy arrays elif isinstance(source, np.ndarray): if affine is None: raise ValueError("affine must be provided when source is a NumPy array") source = np.atleast_2d(source) profile = { "driver": "GTiff", "height": source.shape[0], "width": source.shape[1], "count": 1, "dtype": source.dtype, "crs": crs, "transform": affine, "nodata": nodata, } memfile = MemoryFile() with memfile.open(**profile) as dst: dst.write(source, 1) base_ds = memfile.open() close_base = True else: raise TypeError("Unsupported raster input type") # If custom stuff is specified, create a VRT rewrite_nodata = nodata is not None and nodata != base_ds.nodata rewrite_crs = crs is not None and crs != base_ds.crs rewrite_transform = affine is not None and affine != base_ds.transform rewrite_band = band != 1 create_vrt = rewrite_nodata or rewrite_crs or rewrite_transform or rewrite_band if create_vrt: vrt_options = {} if rewrite_nodata: vrt_options["nodata"] = nodata if rewrite_crs: vrt_options["crs"] = crs if rewrite_transform: vrt_options["transform"] = affine if rewrite_band: vrt_options["band_indexes"] = [band] ds = WarpedVRT(base_ds, **vrt_options) close_ds = True else: ds = base_ds close_ds = False yield ds finally: # Clean up in reverse order if "ds" in locals() and close_ds: ds.close() if "base_ds" in locals() and close_base: base_ds.close() if memfile is not None: memfile.close()
[docs] def open_vector( vectors: Union[ str, Path, # file path / URL BaseGeometry, # single Shapely geometry Sequence[BaseGeometry], # list/tuple of geometries dict, # GeoJSON-like mapping gpd.GeoDataFrame, ], *, layer: Union[int, str] = 0, crs=None, affine: Affine | None = None, ) -> gpd.GeoDataFrame: """Normalise *vectors* into a GeoDataFrame. Parameters ---------- vectors : many flavours, see type hint above. layer : layer name or index when reading multi-layer files. crs : CRS to assign if the input has none (e.g. Shapely geom). affine : Affine of the *raster* (optional). If crs is None and you pass an Affine with EPSG info (pyproj 3.6+ Affine.crs), that CRS will be used. But don't quote me on that, this might fail. Returns ------- gdf : GeoDataFrame Always contains at least a 'geometry' column and a CRS. """ # GeoDataFrame if isinstance(vectors, gpd.GeoDataFrame): gdf = vectors.copy() # String elif isinstance(vectors, (str, Path)): try: gdf = gpd.read_file(vectors, layer=layer) # remember original pathname so later code can emit it if needed gdf.attrs["file_path"] = str(vectors) except Exception: # maybe it's a wkt string? try: gdf = gpd.GeoDataFrame({"geometry": [wkt.loads(vectors)]}, crs=crs) except Exception: # maybe it's an unparsed GeoJSON mapping? gdf = gpd.GeoDataFrame.from_features(json.loads(vectors), crs=crs) except Exception as e: raise ValueError(f"Cannot read vector data from {vectors!r}: {e}") # dict GeoJSON-like mapping elif isinstance(vectors, dict): if "type" not in vectors: raise ValueError("GeoJSON mapping must have a 'type' key") gdf = gpd.GeoDataFrame.from_features(vectors, crs=crs) # single geometry or sequence of geometries elif isinstance(vectors, BaseGeometry) or ( isinstance(vectors, (list, tuple)) and all(isinstance(g, BaseGeometry) for g in vectors) ): geoms = [vectors] if isinstance(vectors, BaseGeometry) else list(vectors) gdf = gpd.GeoDataFrame({"geometry": geoms}, crs=crs) # i give up else: raise TypeError( "Unsupported vector input. Must be a file path, GeoDataFrame, " "Shapely geometry, GeoJSON mapping, or sequence of geometries." ) if gdf.crs is None: if crs is None and affine is not None and hasattr(affine, "crs"): crs = affine.crs if crs is not None: gdf.set_crs(crs, inplace=True) return gdf