This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 0b88512a5c [GH-2108] Geopandas.GeoSeries: Implement `union_all`
aggregation (#2109)
0b88512a5c is described below
commit 0b88512a5cadf70ca297da072467ecdf6f172f77
Author: Peter Nguyen <[email protected]>
AuthorDate: Fri Jul 18 10:17:30 2025 -0700
[GH-2108] Geopandas.GeoSeries: Implement `union_all` aggregation (#2109)
* Implement union_all
* Handle len(self) == 0 explicitly for various tests and other PR feedback
---
python/sedona/geopandas/geoseries.py | 96 +++++++++++++++++++---
python/tests/geopandas/test_geopandas_base.py | 7 ++
python/tests/geopandas/test_geoseries.py | 12 ++-
.../tests/geopandas/test_match_geopandas_series.py | 34 +++++++-
4 files changed, 132 insertions(+), 17 deletions(-)
diff --git a/python/sedona/geopandas/geoseries.py
b/python/sedona/geopandas/geoseries.py
index ee54f8704d..1b0ebe66a3 100644
--- a/python/sedona/geopandas/geoseries.py
+++ b/python/sedona/geopandas/geoseries.py
@@ -199,12 +199,12 @@ class GeoSeries(GeoFrame, pspd.Series):
)
try:
- pdf = s.apply(try_geom_to_ewkb)
+ pd_series = s.apply(try_geom_to_ewkb)
except Exception as e:
raise TypeError(f"Non-geometry column passed to GeoSeries:
{e}")
# initialize the parent class pyspark Series with the pandas Series
- super().__init__(data=pdf)
+ super().__init__(data=pd_series)
# manually set it to binary type
col = next(
@@ -251,6 +251,9 @@ class GeoSeries(GeoFrame, pspd.Series):
"""
from pyproj import CRS
+ if len(self) == 0:
+ return None
+
tmp = self._process_geometry_column("ST_SRID", rename="crs",
returns_geom=False)
ps_series = tmp.take([0])
srid = ps_series.iloc[0]
@@ -395,7 +398,13 @@ class GeoSeries(GeoFrame, pspd.Series):
return result
def _process_geometry_column(
- self, operation: str, rename: str, returns_geom: bool = True, *args,
**kwargs
+ self,
+ operation: str,
+ rename: str,
+ returns_geom: bool = True,
+ is_aggr: bool = False,
+ *args,
+ **kwargs,
) -> Union["GeoSeries", pspd.Series]:
"""
Helper method to process a single geometry column with a specified
operation.
@@ -437,7 +446,7 @@ class GeoSeries(GeoFrame, pspd.Series):
sql_expr = f"{operation}(`{first_col}`{params})"
return self._query_geometry_column(
- sql_expr, first_col, rename, returns_geom=returns_geom
+ sql_expr, first_col, rename, returns_geom=returns_geom,
is_aggr=is_aggr
)
def _query_geometry_column(
@@ -447,6 +456,7 @@ class GeoSeries(GeoFrame, pspd.Series):
rename: str,
df: pyspark.sql.DataFrame = None,
returns_geom: bool = True,
+ is_aggr: bool = False,
) -> Union["GeoSeries", pspd.Series]:
"""
Helper method to query a single geometry column with a specified
operation.
@@ -463,6 +473,8 @@ class GeoSeries(GeoFrame, pspd.Series):
The dataframe to query. If not provided, the internal dataframe
will be used.
returns_geom : bool, default True
If True, the geometry column will be converted back to EWKB format.
+ is_aggr : bool, default False
+ If True, the query is an aggregation query.
Returns
-------
@@ -499,14 +511,25 @@ class GeoSeries(GeoFrame, pspd.Series):
query = f"{query} as `{rename}`"
- # We always select NATURAL_ORDER_COLUMN_NAME, to avoid having to
regenerate it in the result
- # We always select SPARK_DEFAULT_INDEX_NAME, to retain series index
info
- sdf = df.selectExpr(query, SPARK_DEFAULT_INDEX_NAME,
NATURAL_ORDER_COLUMN_NAME)
+ exprs = [query]
+
+ index_spark_columns = []
+ index_fields = []
+ if not is_aggr:
+ # We always select NATURAL_ORDER_COLUMN_NAME, to avoid having to
regenerate it in the result
+ # We always select SPARK_DEFAULT_INDEX_NAME, to retain series
index info
+ exprs.append(SPARK_DEFAULT_INDEX_NAME)
+ exprs.append(NATURAL_ORDER_COLUMN_NAME)
+ index_spark_columns = [scol_for(df, SPARK_DEFAULT_INDEX_NAME)]
+ index_fields = [self._internal.index_fields[0]]
+ # else if is_aggr, we don't select the index columns
+
+ sdf = df.selectExpr(*exprs)
internal = self._internal.copy(
spark_frame=sdf,
- index_fields=[self._internal.index_fields[0]],
- index_spark_columns=[scol_for(sdf, SPARK_DEFAULT_INDEX_NAME)],
+ index_fields=index_fields,
+ index_spark_columns=index_spark_columns,
data_spark_columns=[scol_for(sdf, rename)],
data_fields=[self._internal.data_fields[0].copy(name=rename)],
column_label_names=[(rename,)],
@@ -1319,9 +1342,58 @@ class GeoSeries(GeoFrame, pspd.Series):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
- def union_all(self, method="unary", grid_size=None):
- # Implementation of the abstract method
- raise NotImplementedError("This method is not implemented yet.")
+ def union_all(self, method="unary", grid_size=None) -> BaseGeometry:
+ """Returns a geometry containing the union of all geometries in the
+ ``GeoSeries``.
+
+ Sedona does not support the method or grid_size argument, so the user
does not need to manually
+ decide the algorithm being used.
+
+ Parameters
+ ----------
+ method : str (default ``"unary"``)
+ Not supported in Sedona.
+
+ grid_size : float, default None
+ Not supported in Sedona.
+
+ Examples
+ --------
+
+ >>> from sedona.geopandas import GeoSeries
+ >>> from shapely.geometry import box
+ >>> s = GeoSeries([box(0, 0, 1, 1), box(0, 0, 2, 2)])
+ >>> s
+ 0 POLYGON ((1 0, 1 1, 0 1, 0 0, 1 0))
+ 1 POLYGON ((2 0, 2 2, 0 2, 0 0, 2 0))
+ dtype: geometry
+
+ >>> s.union_all()
+ <POLYGON ((0 1, 0 2, 2 2, 2 0, 1 0, 0 0, 0 1))>
+ """
+ if grid_size is not None:
+ raise NotImplementedError("Sedona does not support the grid_size
argument")
+ if method != "unary":
+ import warnings
+
+ warnings.warn(
+ f"Sedona does not support manually specifying different union
methods. Ignoring non-default method argument of {method}"
+ )
+
+ if len(self) == 0:
+ # While it's not explicitly defined in geopandas docs, this is
what geopandas returns for empty GeoSeries
+ # If it ever changes for some reason, we'll catch that with the
test
+ from shapely.geometry import GeometryCollection
+
+ return GeometryCollection()
+
+ # returns_geom needs to be False here so we don't convert back to EWKB
format.
+ tmp = self._process_geometry_column(
+ "ST_Union_Aggr", rename="union_all", is_aggr=True,
returns_geom=False
+ )
+ ps_series = tmp.take([0])
+ geom = ps_series.iloc[0]
+ return geom
def intersects(
self, other: Union["GeoSeries", BaseGeometry], align: Union[bool,
None] = None
diff --git a/python/tests/geopandas/test_geopandas_base.py
b/python/tests/geopandas/test_geopandas_base.py
index 824a53d1b4..c67af0dd5f 100644
--- a/python/tests/geopandas/test_geopandas_base.py
+++ b/python/tests/geopandas/test_geopandas_base.py
@@ -23,6 +23,7 @@ import pandas as pd
import pyspark.pandas as ps
from pandas.testing import assert_series_equal
from contextlib import contextmanager
+from shapely.geometry.base import BaseGeometry
class TestGeopandasBase(TestBase):
@@ -96,3 +97,9 @@ class TestGeopandasBase(TestBase):
yield
finally:
ps.reset_option("compute.ops_on_diff_frames")
+
+ @classmethod
+ def check_geom_equals(cls, actual: BaseGeometry, expected: BaseGeometry):
+ assert isinstance(actual, BaseGeometry)
+ assert isinstance(expected, BaseGeometry)
+ assert actual.equals(expected)
diff --git a/python/tests/geopandas/test_geoseries.py
b/python/tests/geopandas/test_geoseries.py
index d86226e1f8..625d1b9ffa 100644
--- a/python/tests/geopandas/test_geoseries.py
+++ b/python/tests/geopandas/test_geoseries.py
@@ -33,6 +33,7 @@ from shapely.geometry import (
MultiLineString,
MultiPolygon,
LinearRing,
+ box,
)
from pandas.testing import assert_series_equal
import pytest
@@ -781,7 +782,16 @@ class TestGeoSeries(TestGeopandasBase):
pass
def test_union_all(self):
- pass
+ s = GeoSeries([box(0, 0, 1, 1), box(0, 0, 2, 2)])
+ result = s.union_all()
+ expected = Polygon([(0, 1), (0, 2), (2, 2), (2, 0), (1, 0), (0, 0),
(0, 1)])
+ self.check_geom_equals(result, expected)
+
+ # Empty GeoSeries
+ s = sgpd.GeoSeries([])
+ result = s.union_all()
+ expected = GeometryCollection()
+ self.check_geom_equals(result, expected)
def test_intersects(self):
s = sgpd.GeoSeries(
diff --git a/python/tests/geopandas/test_match_geopandas_series.py
b/python/tests/geopandas/test_match_geopandas_series.py
index acd359bacb..16bf86ad33 100644
--- a/python/tests/geopandas/test_match_geopandas_series.py
+++ b/python/tests/geopandas/test_match_geopandas_series.py
@@ -140,8 +140,14 @@ class TestMatchGeopandasSeries(TestGeopandasBase):
for _, geom in self.geoms:
sgpd_result = GeoSeries(geom)
gpd_result = gpd.GeoSeries(geom)
+ # The below method calls to_geopandas() on sgpd_result, so we
don't do it here
self.check_sgpd_equals_gpd(sgpd_result, gpd_result)
+ # Ensure we have the same result for empty GeoSeries
+ sgpd_series = GeoSeries([])
+ gpd_series = gpd.GeoSeries([])
+ self.check_sgpd_equals_gpd(sgpd_series, gpd_series)
+
def test_psdf(self):
# this is to make sure the spark session works with pandas on spark api
psdf = ps.DataFrame(
@@ -428,7 +434,8 @@ class TestMatchGeopandasSeries(TestGeopandasBase):
def test_is_valid_reason(self):
# is_valid_reason was added in geopandas 1.0.0
if parse_version(gpd.__version__) < parse_version("1.0.0"):
- return
+ pytest.skip("geopandas is_valid_reason requires version 1.0.0 or
higher")
+
data = [
Polygon([(0, 0), (1, 1), (0, 1)]),
Polygon([(0, 0), (1, 1), (1, 0), (0, 1)]), # bowtie geometry
@@ -499,7 +506,7 @@ class TestMatchGeopandasSeries(TestGeopandasBase):
def test_get_geometry(self):
if parse_version(gpd.__version__) < parse_version("1.0.0"):
- return
+ pytest.skip("geopandas get_geometry requires version 1.0.0 or
higher")
for _, geom in self.geoms:
# test negative index, in-bounds index, and out of bounds index
@@ -591,7 +598,8 @@ class TestMatchGeopandasSeries(TestGeopandasBase):
# 'structure' method requires shapely >= 2.1.0
if shapely.__version__ < "2.1.0":
- return
+ pytest.skip("geopandas make_valid requires shapely >= 2.1.0")
+
for _, geom in self.geoms:
sgpd_result = GeoSeries(geom).make_valid(method="structure")
gpd_result = gpd.GeoSeries(geom).make_valid(method="structure")
@@ -634,7 +642,20 @@ class TestMatchGeopandasSeries(TestGeopandasBase):
pass
def test_union_all(self):
- pass
+ if parse_version(gpd.__version__) < parse_version("1.1.0"):
+ pytest.skip("geopandas union_all requires version 1.1.0 or higher")
+
+ # Union all the valid geometries
+ # Neither our nor geopandas' implementation supports invalid geometries
+ lst = [g for _, geom in self.geoms for g in geom if g.is_valid]
+ sgpd_result = GeoSeries(lst).union_all()
+ gpd_result = gpd.GeoSeries(lst).union_all()
+ self.check_geom_equals(sgpd_result, gpd_result)
+
+ # Ensure we have the same result for empty GeoSeries
+ sgpd_result = GeoSeries([]).union_all()
+ gpd_result = gpd.GeoSeries([]).union_all()
+ self.check_geom_equals(sgpd_result, gpd_result)
def test_intersects(self):
for _, geom in self.geoms:
@@ -719,3 +740,8 @@ class TestMatchGeopandasSeries(TestGeopandasBase):
sgpd_series = sgpd_series.set_crs(epsg=3857, allow_override=True)
gpd_series = gpd_series.set_crs(epsg=3857, allow_override=True)
assert sgpd_series.crs == gpd_series.crs
+
+ # Ensure we have the same result for empty GeoSeries
+ sgpd_series = GeoSeries([])
+ gpd_series = gpd.GeoSeries([])
+ assert sgpd_series.crs == gpd_series.crs