Source code for pudl.analysis.spatial

"""Spatial operations for demand allocation."""

import itertools
import warnings
from collections.abc import Callable, Iterable
from typing import Literal

import geopandas as gpd  # noqa: ICN002
import pandas as pd
import shapely.ops
from shapely.geometry import GeometryCollection, MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry


[docs] def check_gdf(gdf: gpd.GeoDataFrame) -> None: """Check that GeoDataFrame contains (Multi)Polygon geometries with non-zero area. Args: gdf: GeoDataFrame. Raises: TypeError: Object is not a GeoDataFrame. AttributeError: GeoDataFrame has no geometry. TypeError: Geometry is not a GeoSeries. ValueError: Geometry contains null geometries. ValueError: Geometry contains non-(Multi)Polygon geometries. ValueError: Geometry contains (Multi)Polygon geometries with zero area. ValueError: MultiPolygon contains Polygon geometries with zero area. """ if not isinstance(gdf, gpd.GeoDataFrame): raise TypeError("Object is not a GeoDataFrame") if not hasattr(gdf, "geometry"): raise AttributeError("GeoDataFrame has no geometry") if not isinstance(gdf.geometry, gpd.GeoSeries): raise TypeError("Geometry is not a GeoSeries") warnings.filterwarnings("ignore", "GeoSeries.isna", UserWarning) if gdf.geometry.isna().any(): raise ValueError("Geometry contains null geometries") if not gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"]).all(): raise ValueError("Geometry contains non-(Multi)Polygon geometries") if not gdf.geometry.area.all(): raise ValueError("Geometry contains (Multi)Polygon geometries with zero area") is_mpoly = gdf.geometry.geom_type == "MultiPolygon" for mpoly in gdf.geometry[is_mpoly]: for poly in mpoly.geoms: if not poly.area: raise ValueError( "MultiPolygon contains Polygon geometries with zero area" )
[docs] def polygonize(geom: BaseGeometry) -> Polygon | MultiPolygon: """Convert geometry to (Multi)Polygon. Args: geom: Geometry to convert to (Multi)Polygon. Returns: Geometry converted to (Multi)Polygon, with all zero-area components removed. Raises: ValueError: Geometry has zero area. """ polys = [] # Explode geometries to polygons if isinstance(geom, GeometryCollection): for g in geom.geoms: if isinstance(g, Polygon): polys.append(g) elif isinstance(g, MultiPolygon): polys.extend(g) elif isinstance(geom, MultiPolygon): polys.extend(geom) elif isinstance(geom, Polygon): polys.append(geom) # Remove zero-area polygons polys = [p for p in polys if p.area] if not polys: raise ValueError("Geometry has zero area") if len(polys) == 1: return polys[0] return MultiPolygon(polys)
[docs] def explode(gdf: gpd.GeoDataFrame, ratios: Iterable[str] = None) -> gpd.GeoDataFrame: """Explode MultiPolygon to multiple Polygon geometries. Args: gdf: GeoDataFrame with non-zero-area (Multi)Polygon geometries. ratios: Names of columns to rescale by the area fraction of the Polygon relative to the MultiPolygon. If provided, MultiPolygon cannot self-intersect. By default, the original value is used unchanged. Raises: ValueError: Geometry contains self-intersecting MultiPolygon. Returns: GeoDataFrame with each Polygon as a separate row in the GeoDataFrame. The index is the number of the source row in the input GeoDataFrame. """ check_gdf(gdf) gdf = gdf.reset_index(drop=True) is_mpoly = gdf.geometry.geom_type == "MultiPolygon" if ratios and is_mpoly.any(): union_area = gdf.geometry[is_mpoly].apply(shapely.ops.unary_union).area if (union_area != gdf.geometry[is_mpoly].area).any(): raise ValueError("Geometry contains self-intersecting MultiPolygon") result = gdf.explode(index_parts=False) if ratios: fraction = ( result.geometry.area.to_numpy() / gdf.geometry.area[result.index].to_numpy() ) result[ratios] = result[ratios].multiply(fraction, axis="index") return result[gdf.columns]
[docs] def self_union(gdf: gpd.GeoDataFrame, ratios: Iterable[str] = None) -> gpd.GeoDataFrame: """Calculate the geometric union of a feature layer with itself. Areas of overlap are split into two or more geometrically-identical features: one for each of the original overlapping features. Each split feature contains the attributes of the original feature. Args: gdf: GeoDataFrame with non-zero-area MultiPolygon geometries. ratios: Names of columns to rescale by the area fraction of the split feature relative to the original. By default, the original value is used unchanged. Returns: GeoDataFrame representing the union of the input features with themselves. Its index contains tuples of the index of the original overlapping features. Raises: NotImplementedError: MultiPolygon geometries are not yet supported. """ check_gdf(gdf) gdf = gdf.reset_index(drop=True) is_mpoly = gdf.geometry.geom_type == "MultiPolygon" if is_mpoly.any(): raise NotImplementedError("MultiPolygon geometries are not yet supported") # Calculate all pairwise intersections # https://nbviewer.jupyter.org/gist/jorisvandenbossche/3a55a16fda9b3c37e0fb48b1d4019e65 pairs = itertools.combinations(gdf.geometry, 2) intersections = gpd.GeoSeries([a.intersection(b) for a, b in pairs]) # Form polygons from the boundaries of the original polygons and their intersections boundaries = pd.concat([gdf.geometry, intersections]).boundary.union_all() polygons = gpd.GeoSeries(shapely.ops.polygonize(boundaries)) # Determine origin of each polygon by a spatial join on representative points points = gpd.GeoDataFrame(geometry=polygons.representative_point()) oids = gpd.sjoin( points, gdf[["geometry"]], how="left", predicate="within", )["index_right"] # Build new dataframe columns = get_data_columns(gdf) df = gpd.GeoDataFrame( data=gdf.loc[oids, columns].reset_index(drop=True), geometry=polygons[oids.index].to_numpy(), ) if ratios: fraction = df.area.to_numpy() / gdf.area[oids].to_numpy() df[ratios] = df[ratios].multiply(fraction, axis="index") # Add original row indices to index df.index = oids.groupby(oids.index).agg(tuple)[oids.index] df.index.name = None # Return with original column order return df[gdf.columns]
[docs] def dissolve( gdf: gpd.GeoDataFrame, by: Iterable[str], func: Callable | str | list | dict, how: ( Literal["union", "first"] | Callable[[gpd.GeoSeries], BaseGeometry] ) = "union", ) -> gpd.GeoDataFrame: """Dissolve layer by aggregating features based on common attributes. Args: gdf: GeoDataFrame with non-empty (Multi)Polygon geometries. by: Names of columns to group features by. func: Aggregation function for data columns (see :meth:`pd.DataFrame.groupby`). how: Aggregation function for geometry column. Either 'union' (:meth:`gpd.GeoSeries.union_all()`), 'first' (first geometry in group), or a function aggregating multiple geometries into one. Returns: GeoDataFrame with dissolved geometry and data columns, and grouping columns set as the index. """ check_gdf(gdf) merges = {"union": lambda x: x.union_all(), "first": lambda x: x.iloc[0]} data = gdf.drop(columns=gdf.geometry.name).groupby(by=by).aggregate(func) geometry = gdf.groupby(by=by, group_keys=False)[gdf.geometry.name].aggregate( merges.get(how, how) ) return gpd.GeoDataFrame(geometry, geometry=gdf.geometry.name, crs=gdf.crs).join( data )
[docs] def overlay( *gdfs: gpd.GeoDataFrame, how: Literal[ "intersection", "union", "identity", "symmetric_difference", "difference" ] = "intersection", ratios: Iterable[str] = None, ) -> gpd.GeoDataFrame: """Overlay multiple layers incrementally. When a feature from one layer overlaps the feature of another layer, the area of overlap is split into two geometrically-identical features: one for each of the original overlapping features. Each split feature contains the attributes of the original feature. TODO: To identify the source of output features, the user can ensure that each layer contains a column to index by. Alternatively, tuples of indices of the overlapping feature from each layer (null if none) could be returned as the index. Args: gdfs: GeoDataFrames with non-empty (Multi)Polygon geometries assumed to contain no self-overlaps (see :func:`self_union`). Names of (non-geometry) columns cannot be used more than once. Any index columns are ignored. how: Spatial overlay method (see :func:`gpd.overlay`). ratios: Names of columns to rescale by the area fraction of the split feature relative to the original. By default, the original value is used unchanged. Raises: ValueError: Duplicate column names in layers. Returns: GeoDataFrame with the geometries and attributes resulting from the overlay. """ for gdf in gdfs: check_gdf(gdf) if ratios is None: ratios = [] # Check for duplicate non-geometry column names seen = set() duplicates = { c for df in gdfs for c in get_data_columns(df) if c in seen or seen.add(c) } if duplicates: raise ValueError(f"Duplicate column names in layers: {duplicates}") # Drop index columns and replace with default index of known name # NOTE: Assumes that default index name not already names of columns keys = [f"__id{i}__" for i in range(len(gdfs))] gdfs = [ df.reset_index(drop=True).rename_axis(k) for df, k in zip(gdfs, keys, strict=True) ] overlay = None for i in range(len(gdfs) - 1): a, b = overlay if i else gdfs[i], gdfs[i + 1] # Perform overlay with geometry and constant fields constants = [ [c for c in df.columns if c == df.geometry.name or c not in ratios] for df in (a, b) ] overlay = gpd.overlay( a[constants[0]].reset_index(), b[constants[1]].reset_index(), how=how ) # For uniform fields, compute area fraction of originals and merge by index # new_value = (old_value / old_area) * new_area dfs = [] for j, df in enumerate((a, b)): df_ratios = [c for c in df.columns if c != df.geometry.name and c in ratios] if df_ratios: dfs.append( df[df_ratios] .div(df.area, axis="index") .reindex(overlay[keys[j]]) .reset_index(drop=True) .mul(overlay.area, axis="index") ) if dfs: # Assumed to be faster than incremental concat overlay = pd.concat([overlay] + dfs, axis="columns") return overlay.drop(columns=keys)
[docs] def get_data_columns(df: pd.DataFrame) -> list: """Return list of columns, ignoring geometry.""" if isinstance(df, gpd.GeoDataFrame) and hasattr(df, "geometry"): return [col for col in df.columns if col != df.geometry.name] return list(df.columns)