"""Spatial operations for demand allocation."""
import itertools
import warnings
from collections.abc import Callable, Iterable
from typing import Literal
import geopandas as gpd # noqa: ICN002
import pandas as pd
import shapely.ops
from shapely.geometry import GeometryCollection, MultiPolygon, Polygon
from shapely.geometry.base import BaseGeometry
[docs]
def check_gdf(gdf: gpd.GeoDataFrame) -> None:
"""Check that GeoDataFrame contains (Multi)Polygon geometries with non-zero area.
Args:
gdf: GeoDataFrame.
Raises:
TypeError: Object is not a GeoDataFrame.
AttributeError: GeoDataFrame has no geometry.
TypeError: Geometry is not a GeoSeries.
ValueError: Geometry contains null geometries.
ValueError: Geometry contains non-(Multi)Polygon geometries.
ValueError: Geometry contains (Multi)Polygon geometries with zero area.
ValueError: MultiPolygon contains Polygon geometries with zero area.
"""
if not isinstance(gdf, gpd.GeoDataFrame):
raise TypeError("Object is not a GeoDataFrame")
if not hasattr(gdf, "geometry"):
raise AttributeError("GeoDataFrame has no geometry")
if not isinstance(gdf.geometry, gpd.GeoSeries):
raise TypeError("Geometry is not a GeoSeries")
warnings.filterwarnings("ignore", "GeoSeries.isna", UserWarning)
if gdf.geometry.isna().any():
raise ValueError("Geometry contains null geometries")
if not gdf.geometry.geom_type.isin(["Polygon", "MultiPolygon"]).all():
raise ValueError("Geometry contains non-(Multi)Polygon geometries")
if not gdf.geometry.area.all():
raise ValueError("Geometry contains (Multi)Polygon geometries with zero area")
is_mpoly = gdf.geometry.geom_type == "MultiPolygon"
for mpoly in gdf.geometry[is_mpoly]:
for poly in mpoly.geoms:
if not poly.area:
raise ValueError(
"MultiPolygon contains Polygon geometries with zero area"
)
[docs]
def polygonize(geom: BaseGeometry) -> Polygon | MultiPolygon:
"""Convert geometry to (Multi)Polygon.
Args:
geom: Geometry to convert to (Multi)Polygon.
Returns:
Geometry converted to (Multi)Polygon, with all zero-area components removed.
Raises:
ValueError: Geometry has zero area.
"""
polys = []
# Explode geometries to polygons
if isinstance(geom, GeometryCollection):
for g in geom.geoms:
if isinstance(g, Polygon):
polys.append(g)
elif isinstance(g, MultiPolygon):
polys.extend(g)
elif isinstance(geom, MultiPolygon):
polys.extend(geom)
elif isinstance(geom, Polygon):
polys.append(geom)
# Remove zero-area polygons
polys = [p for p in polys if p.area]
if not polys:
raise ValueError("Geometry has zero area")
if len(polys) == 1:
return polys[0]
return MultiPolygon(polys)
[docs]
def explode(gdf: gpd.GeoDataFrame, ratios: Iterable[str] = None) -> gpd.GeoDataFrame:
"""Explode MultiPolygon to multiple Polygon geometries.
Args:
gdf: GeoDataFrame with non-zero-area (Multi)Polygon geometries.
ratios: Names of columns to rescale by the area fraction of the Polygon
relative to the MultiPolygon.
If provided, MultiPolygon cannot self-intersect.
By default, the original value is used unchanged.
Raises:
ValueError: Geometry contains self-intersecting MultiPolygon.
Returns:
GeoDataFrame with each Polygon as a separate row in the GeoDataFrame.
The index is the number of the source row in the input GeoDataFrame.
"""
check_gdf(gdf)
gdf = gdf.reset_index(drop=True)
is_mpoly = gdf.geometry.geom_type == "MultiPolygon"
if ratios and is_mpoly.any():
union_area = gdf.geometry[is_mpoly].apply(shapely.ops.unary_union).area
if (union_area != gdf.geometry[is_mpoly].area).any():
raise ValueError("Geometry contains self-intersecting MultiPolygon")
result = gdf.explode(index_parts=False)
if ratios:
fraction = (
result.geometry.area.to_numpy() / gdf.geometry.area[result.index].to_numpy()
)
result[ratios] = result[ratios].multiply(fraction, axis="index")
return result[gdf.columns]
[docs]
def self_union(gdf: gpd.GeoDataFrame, ratios: Iterable[str] = None) -> gpd.GeoDataFrame:
"""Calculate the geometric union of a feature layer with itself.
Areas of overlap are split into two or more geometrically-identical features:
one for each of the original overlapping features.
Each split feature contains the attributes of the original feature.
Args:
gdf: GeoDataFrame with non-zero-area MultiPolygon geometries.
ratios: Names of columns to rescale by the area fraction of the split feature
relative to the original. By default, the original value is used unchanged.
Returns:
GeoDataFrame representing the union of the input features with themselves.
Its index contains tuples of the index of the original overlapping features.
Raises:
NotImplementedError: MultiPolygon geometries are not yet supported.
"""
check_gdf(gdf)
gdf = gdf.reset_index(drop=True)
is_mpoly = gdf.geometry.geom_type == "MultiPolygon"
if is_mpoly.any():
raise NotImplementedError("MultiPolygon geometries are not yet supported")
# Calculate all pairwise intersections
# https://nbviewer.jupyter.org/gist/jorisvandenbossche/3a55a16fda9b3c37e0fb48b1d4019e65
pairs = itertools.combinations(gdf.geometry, 2)
intersections = gpd.GeoSeries([a.intersection(b) for a, b in pairs])
# Form polygons from the boundaries of the original polygons and their intersections
boundaries = pd.concat([gdf.geometry, intersections]).boundary.union_all()
polygons = gpd.GeoSeries(shapely.ops.polygonize(boundaries))
# Determine origin of each polygon by a spatial join on representative points
points = gpd.GeoDataFrame(geometry=polygons.representative_point())
oids = gpd.sjoin(
points,
gdf[["geometry"]],
how="left",
predicate="within",
)["index_right"]
# Build new dataframe
columns = get_data_columns(gdf)
df = gpd.GeoDataFrame(
data=gdf.loc[oids, columns].reset_index(drop=True),
geometry=polygons[oids.index].to_numpy(),
)
if ratios:
fraction = df.area.to_numpy() / gdf.area[oids].to_numpy()
df[ratios] = df[ratios].multiply(fraction, axis="index")
# Add original row indices to index
df.index = oids.groupby(oids.index).agg(tuple)[oids.index]
df.index.name = None
# Return with original column order
return df[gdf.columns]
[docs]
def dissolve(
gdf: gpd.GeoDataFrame,
by: Iterable[str],
func: Callable | str | list | dict,
how: (
Literal["union", "first"] | Callable[[gpd.GeoSeries], BaseGeometry]
) = "union",
) -> gpd.GeoDataFrame:
"""Dissolve layer by aggregating features based on common attributes.
Args:
gdf: GeoDataFrame with non-empty (Multi)Polygon geometries.
by: Names of columns to group features by.
func: Aggregation function for data columns (see :meth:`pd.DataFrame.groupby`).
how: Aggregation function for geometry column.
Either 'union' (:meth:`gpd.GeoSeries.union_all()`),
'first' (first geometry in group),
or a function aggregating multiple geometries into one.
Returns:
GeoDataFrame with dissolved geometry and data columns,
and grouping columns set as the index.
"""
check_gdf(gdf)
merges = {"union": lambda x: x.union_all(), "first": lambda x: x.iloc[0]}
data = gdf.drop(columns=gdf.geometry.name).groupby(by=by).aggregate(func)
geometry = gdf.groupby(by=by, group_keys=False)[gdf.geometry.name].aggregate(
merges.get(how, how)
)
return gpd.GeoDataFrame(geometry, geometry=gdf.geometry.name, crs=gdf.crs).join(
data
)
[docs]
def overlay(
*gdfs: gpd.GeoDataFrame,
how: Literal[
"intersection", "union", "identity", "symmetric_difference", "difference"
] = "intersection",
ratios: Iterable[str] = None,
) -> gpd.GeoDataFrame:
"""Overlay multiple layers incrementally.
When a feature from one layer overlaps the feature of another layer,
the area of overlap is split into two geometrically-identical features:
one for each of the original overlapping features.
Each split feature contains the attributes of the original feature.
TODO: To identify the source of output features, the user can ensure that each
layer contains a column to index by.
Alternatively, tuples of indices of the overlapping
feature from each layer (null if none) could be returned as the index.
Args:
gdfs: GeoDataFrames with non-empty (Multi)Polygon geometries
assumed to contain no self-overlaps (see :func:`self_union`).
Names of (non-geometry) columns cannot be used more than once.
Any index columns are ignored.
how: Spatial overlay method (see :func:`gpd.overlay`).
ratios: Names of columns to rescale by the area fraction of the split feature
relative to the original. By default, the original value is used unchanged.
Raises:
ValueError: Duplicate column names in layers.
Returns:
GeoDataFrame with the geometries and attributes resulting from the overlay.
"""
for gdf in gdfs:
check_gdf(gdf)
if ratios is None:
ratios = []
# Check for duplicate non-geometry column names
seen = set()
duplicates = {
c for df in gdfs for c in get_data_columns(df) if c in seen or seen.add(c)
}
if duplicates:
raise ValueError(f"Duplicate column names in layers: {duplicates}")
# Drop index columns and replace with default index of known name
# NOTE: Assumes that default index name not already names of columns
keys = [f"__id{i}__" for i in range(len(gdfs))]
gdfs = [
df.reset_index(drop=True).rename_axis(k)
for df, k in zip(gdfs, keys, strict=True)
]
overlay = None
for i in range(len(gdfs) - 1):
a, b = overlay if i else gdfs[i], gdfs[i + 1]
# Perform overlay with geometry and constant fields
constants = [
[c for c in df.columns if c == df.geometry.name or c not in ratios]
for df in (a, b)
]
overlay = gpd.overlay(
a[constants[0]].reset_index(), b[constants[1]].reset_index(), how=how
)
# For uniform fields, compute area fraction of originals and merge by index
# new_value = (old_value / old_area) * new_area
dfs = []
for j, df in enumerate((a, b)):
df_ratios = [c for c in df.columns if c != df.geometry.name and c in ratios]
if df_ratios:
dfs.append(
df[df_ratios]
.div(df.area, axis="index")
.reindex(overlay[keys[j]])
.reset_index(drop=True)
.mul(overlay.area, axis="index")
)
if dfs:
# Assumed to be faster than incremental concat
overlay = pd.concat([overlay] + dfs, axis="columns")
return overlay.drop(columns=keys)
[docs]
def get_data_columns(df: pd.DataFrame) -> list:
"""Return list of columns, ignoring geometry."""
if isinstance(df, gpd.GeoDataFrame) and hasattr(df, "geometry"):
return [col for col in df.columns if col != df.geometry.name]
return list(df.columns)