This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new f8fe6f38d4 [GH-2037] Implement _row_wise_operation + intersection,
intersect (#2038)
f8fe6f38d4 is described below
commit f8fe6f38d4e93de82f1ce7dddad93f910f1f0312
Author: Peter Nguyen <[email protected]>
AuthorDate: Wed Jul 2 14:56:29 2025 -0700
[GH-2037] Implement _row_wise_operation + intersection, intersect (#2038)
* Refactor process_geometry_column to create a more flexible
query_geometry_column()
* Implement length()
* Implement intersection
* Implement intersects
* Add rename field to _row_wise_operation and provide select instead of
operation
* Replace PS_INDEX_COL w/ imported SPARK_DEFAULT_INDEX_NAME
---
python/sedona/geopandas/geoseries.py | 250 ++++++++++++++++++++-
python/tests/geopandas/test_geoseries.py | 79 +++++++
.../tests/geopandas/test_match_geopandas_series.py | 25 +++
3 files changed, 344 insertions(+), 10 deletions(-)
diff --git a/python/sedona/geopandas/geoseries.py
b/python/sedona/geopandas/geoseries.py
index 5844b3b21d..cf17eb31a2 100644
--- a/python/sedona/geopandas/geoseries.py
+++ b/python/sedona/geopandas/geoseries.py
@@ -17,11 +17,12 @@
import os
import typing
-from typing import Any, Union, Literal
+from typing import Any, Union, Literal, List
import geopandas as gpd
import pandas as pd
import pyspark.pandas as pspd
+import pyspark
from pyspark.pandas import Series as PandasOnSparkSeries
from pyspark.pandas._typing import Dtype
from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
@@ -31,12 +32,15 @@ from pyspark.pandas.utils import scol_for, log_advice
from pyspark.sql.types import BinaryType
import shapely
+from shapely.geometry.base import BaseGeometry
from sedona.geopandas._typing import Label
from sedona.geopandas.base import GeoFrame
from sedona.geopandas.geodataframe import GeoDataFrame
from sedona.geopandas.geoindex import GeoIndex
+from pyspark.pandas.internal import SPARK_DEFAULT_INDEX_NAME #
__index_level_0__
+
class GeoSeries(GeoFrame, pspd.Series):
"""
@@ -380,7 +384,11 @@ class GeoSeries(GeoFrame, pspd.Series):
return self._query_geometry_column(sql_expr, first_col, rename)
def _query_geometry_column(
- self, query: str, col: Union[str, None], rename: str
+ self,
+ query: str,
+ cols: Union[List[str], str],
+ rename: str,
+ df: pyspark.sql.DataFrame = None,
) -> "GeoSeries":
"""
Helper method to query a single geometry column with a specified
operation.
@@ -389,28 +397,38 @@ class GeoSeries(GeoFrame, pspd.Series):
----------
query : str
The query to apply to the geometry column.
- col : str
- The name of the column to query.
+ cols : List[str] or str
+ The names of the columns to query.
rename : str
The name of the resulting column.
+ df : pyspark.sql.DataFrame
+ The dataframe to query. If not provided, the internal dataframe
will be used.
Returns
-------
GeoSeries
A GeoSeries with the operation applied to the geometry column.
"""
- if not col:
+ if not cols:
raise ValueError("No valid geometry column found.")
- data_type = self._internal.spark_frame.schema[col].dataType
+ if isinstance(cols, str):
+ cols = [cols]
+
+ df = self._internal.spark_frame if df is None else df
- if isinstance(data_type, BinaryType):
- # the backticks here are important so we don't match strings that
happen to be the same as the column name
- query = query.replace(f"`{col}`", f"ST_GeomFromWKB(`{col}`)")
+ for col in cols:
+ data_type = df.schema[col].dataType
+
+ rename = col if not rename else rename
+
+ if isinstance(data_type, BinaryType):
+ # the backticks here are important so we don't match strings
that happen to be the same as the column name
+ query = query.replace(f"`{col}`", f"ST_GeomFromWKB(`{col}`)")
sql_expr = f"{query} as `{rename}`"
- sdf = self._internal.spark_frame.selectExpr(sql_expr)
+ sdf = df.selectExpr(sql_expr)
internal = InternalFrame(
spark_frame=sdf,
index_spark_columns=None,
@@ -778,6 +796,218 @@ class GeoSeries(GeoFrame, pspd.Series):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
+ def intersects(
+ self, other: Union["GeoSeries", BaseGeometry], align: Union[bool,
None] = None
+ ) -> pspd.Series:
+ """Returns a ``Series`` of ``dtype('bool')`` with value ``True`` for
+ each aligned geometry that intersects `other`.
+
+ An object is said to intersect `other` if its `boundary` and `interior`
+ intersects in any way with those of the other.
+
+ The operation works on a 1-to-1 row-wise manner:
+
+ Parameters
+ ----------
+ other : GeoSeries or geometric object
+ The GeoSeries (elementwise) or geometric object to test if is
+ intersected.
+ align : bool | None (default None)
+ If True, automatically aligns GeoSeries based on their indices.
None defaults to True.
+ If False, the order of elements is preserved. (not supported in
Sedona Geopandas)
+
+ Returns
+ -------
+ Series (bool)
+
+ Examples
+ --------
+ >>> from shapely.geometry import Polygon, LineString, Point
+ >>> s = geopandas.GeoSeries(
+ ... [
+ ... Polygon([(0, 0), (2, 2), (0, 2)]),
+ ... LineString([(0, 0), (2, 2)]),
+ ... LineString([(2, 0), (0, 2)]),
+ ... Point(0, 1),
+ ... ],
+ ... )
+ >>> s2 = geopandas.GeoSeries(
+ ... [
+ ... LineString([(1, 0), (1, 3)]),
+ ... LineString([(2, 0), (0, 2)]),
+ ... Point(1, 1),
+ ... Point(-100, -100),
+ ... ],
+ ... index=range(1, 5),
+ ... )
+
+ We can check two GeoSeries against each other, row by row.
+ The GeoSeries above have different indices. We align both GeoSeries
+ based on index values and compare elements with the same index:
+
+ >>> s.intersects(s2)
+ 0 True
+ 1 True
+ 2 True
+ 3 False
+ dtype: bool
+
+ We can also check if each geometry of GeoSeries intersects a single
+ geometry:
+
+ >>> line = LineString([(-1, 1), (3, 1)])
+ >>> s.intersects(line)
+ 0 True
+ 1 True
+ 2 True
+ 3 True
+ dtype: bool
+
+ Notes
+ -----
+ This method works in a row-wise manner. It does not check if an element
+ of one GeoSeries ``crosses`` *any* element of the other one.
+
+ See also
+ --------
+ GeoSeries.disjoint
+ GeoSeries.crosses
+ GeoSeries.touches
+ GeoSeries.intersection
+ """
+ return (
+ self._row_wise_operation(
+ "ST_Intersects(`L`, `R`)", other, align, rename="intersects"
+ )
+ .to_spark_pandas()
+ .astype("bool")
+ )
+
+ def intersection(
+ self, other: Union["GeoSeries", BaseGeometry], align: Union[bool,
None] = None
+ ) -> "GeoSeries":
+ """Returns a ``GeoSeries`` of the intersection of points in each
+ aligned geometry with `other`.
+
+ The operation works on a 1-to-1 row-wise manner:
+
+ Parameters
+ ----------
+ other : Geoseries or geometric object
+ The Geoseries (elementwise) or geometric object to find the
+ intersection with.
+ align : bool | None (default None)
+ If True, automatically aligns GeoSeries based on their indices.
None defaults to True.
+ If False, the order of elements is preserved. (not supported in
Sedona Geopandas)
+
+ Returns
+ -------
+ GeoSeries
+
+ Examples
+ --------
+ >>> from shapely.geometry import Polygon, LineString, Point
+ >>> s = geopandas.GeoSeries(
+ ... [
+ ... Polygon([(0, 0), (2, 2), (0, 2)]),
+ ... Polygon([(0, 0), (2, 2), (0, 2)]),
+ ... LineString([(0, 0), (2, 2)]),
+ ... LineString([(2, 0), (0, 2)]),
+ ... Point(0, 1),
+ ... ],
+ ... )
+ >>> s2 = geopandas.GeoSeries(
+ ... [
+ ... Polygon([(0, 0), (1, 1), (0, 1)]),
+ ... LineString([(1, 0), (1, 3)]),
+ ... LineString([(2, 0), (0, 2)]),
+ ... Point(1, 1),
+ ... Point(-100, -100),
+ ... ],
+ ... )
+
+ We can do an intersection of each geometry and a single
+ shapely geometry:
+
+ >>> geom = Polygon([(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5,
-0.5), (-0.5, -0.5)])
+ >>> s.intersection(geom)
+ Polygon([(0, 0), (2, 2), (0, 2)]),
+ Polygon([(0, 0), (2, 2), (0, 2)]),
+ LineString([(0, 0), (2, 2)]),
+ LineString([(2, 0), (0, 2)]),
+ Point(0, 1),
+ dtype: geometry
+
+ >>> geom = Polygon([(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5,
-0.5), (-0.5, -0.5)])
+ >>> s.intersection(Polygon([(0, 0), (1, 1), (0, 1)]))
+ 0 POLYGON ((0 0, 2 2, 0 2))
+ 1 POLYGON ((0 0, 2 2, 0 2))
+ 2 LINESTRING (0 0, 2 2)
+ 3 LINESTRING (2 0, 0 2)
+ 4 POINT (0 1)
+ dtype: geometry
+
+ We can also check two GeoSeries against each other, row by row.
+ The GeoSeries above have different indices. We align both GeoSeries
+ based on index values and compare elements with the same index.
+
+ >>> s.intersection(s2)
+ 0 POLYGON ((0 0, 1 1, 0 1, 0 0))
+ 1 LINESTRING (1 1, 1 2)
+ 2 POINT (1 1)
+ 3 POINT (1 1)
+ 4 POLYGON EMPTY
+ dtype: geometry
+
+ See Also
+ --------
+ GeoSeries.difference
+ GeoSeries.symmetric_difference
+ GeoSeries.union
+ """
+ return self._row_wise_operation(
+ "ST_Intersection(`L`, `R`)", other, align, rename="intersection"
+ )
+
+ def _row_wise_operation(
+ self,
+ select: str,
+ other: Union["GeoSeries", BaseGeometry],
+ align: Union[bool, None],
+ rename: str,
+ ):
+ """
+ Helper function to perform a row-wise operation on two GeoSeries.
+ The self column and other column are aliased to `L` and `R`,
respectively.
+ """
+ from pyspark.sql.functions import col
+
+ # Note: this is specifically False. None is valid since it defaults to
True similar to geopandas
+ if align is False:
+ raise NotImplementedError("Sedona Geopandas does not support
align=False")
+
+ if isinstance(other, BaseGeometry):
+ other = GeoSeries([other] * len(self))
+
+ assert isinstance(other, GeoSeries), f"Invalid type for other:
{type(other)}"
+
+ # TODO: this does not yet support multi-index
+ df = self._internal.spark_frame.select(
+ col(self.get_first_geometry_column()).alias("L"),
+ col(SPARK_DEFAULT_INDEX_NAME),
+ )
+ other_df = other._internal.spark_frame.select(
+ col(other.get_first_geometry_column()).alias("R"),
+ col(SPARK_DEFAULT_INDEX_NAME),
+ )
+ joined_df = df.join(other_df, on=SPARK_DEFAULT_INDEX_NAME, how="outer")
+ return self._query_geometry_column(
+ select,
+ cols=["L", "R"],
+ rename=rename,
+ df=joined_df,
+ )
+
def intersection_all(self):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
diff --git a/python/tests/geopandas/test_geoseries.py
b/python/tests/geopandas/test_geoseries.py
index cc1b3338b3..8c0805b5f8 100644
--- a/python/tests/geopandas/test_geoseries.py
+++ b/python/tests/geopandas/test_geoseries.py
@@ -50,6 +50,8 @@ class TestGeoSeries(TestBase):
assert len(actual) == len(expected)
sgpd_result = actual.to_geopandas()
for a, e in zip(sgpd_result, expected):
+ if a.is_empty and e.is_empty:
+ continue
self.assert_geometry_almost_equal(a, e)
def test_area(self):
@@ -322,6 +324,83 @@ class TestGeoSeries(TestBase):
def test_union_all(self):
pass
+ def test_intersects(self):
+ s = sgpd.GeoSeries(
+ [
+ Polygon([(0, 0), (2, 2), (0, 2)]),
+ LineString([(0, 0), (2, 2)]),
+ LineString([(2, 0), (0, 2)]),
+ Point(0, 1),
+ ],
+ )
+ s2 = sgpd.GeoSeries(
+ [
+ LineString([(1, 0), (1, 3)]),
+ LineString([(2, 0), (0, 2)]),
+ Point(1, 1),
+ Point(-100, -100),
+ ],
+ )
+
+ result = s.intersects(s2)
+ expected = pd.Series([True, True, True, False])
+ assert_series_equal(result.to_pandas(), expected)
+
+ line = LineString([(-1, 1), (3, 1)])
+ result = s.intersects(line)
+ expected = pd.Series([True, True, True, True])
+ assert_series_equal(result.to_pandas(), expected)
+
+ def test_intersection(self):
+ s = sgpd.GeoSeries(
+ [
+ Polygon([(0, 0), (2, 2), (0, 2)]),
+ Polygon([(0, 0), (2, 2), (0, 2)]),
+ LineString([(0, 0), (2, 2)]),
+ LineString([(2, 0), (0, 2)]),
+ Point(0, 1),
+ ],
+ )
+
+ geom = Polygon(
+ [(-0.5, -0.5), (-0.5, 2.5), (2.5, 2.5), (2.5, -0.5), (-0.5, -0.5)]
+ )
+ result = s.intersection(geom)
+ expected = gpd.GeoSeries(
+ [
+ Polygon([(0, 0), (2, 2), (0, 2)]),
+ Polygon([(0, 0), (2, 2), (0, 2)]),
+ LineString([(0, 0), (2, 2)]),
+ LineString([(2, 0), (0, 2)]),
+ Point(0, 1),
+ ]
+ )
+ self.check_sgpd_equals_gpd(result, expected)
+
+ s2 = sgpd.GeoSeries(
+ [
+ Polygon([(0, 0), (1, 1), (0, 1)]),
+ LineString([(1, 0), (1, 3)]),
+ LineString([(2, 0), (0, 2)]),
+ Point(1, 1),
+ Point(-100, -100),
+ ],
+ )
+ result = s.intersection(s2)
+ expected = gpd.GeoSeries(
+ [
+ Polygon([(0, 0), (0, 1), (1, 1), (0, 0)]),
+ LineString([(1, 1), (1, 2)]),
+ Point(1, 1),
+ Point(1, 1),
+ Point(),
+ ]
+ )
+ self.check_sgpd_equals_gpd(result, expected)
+
+ with pytest.raises(NotImplementedError):
+ s.intersection(s2, align=False)
+
def test_intersection_all(self):
pass
diff --git a/python/tests/geopandas/test_match_geopandas_series.py
b/python/tests/geopandas/test_match_geopandas_series.py
index c55db097db..ec89ba23bd 100644
--- a/python/tests/geopandas/test_match_geopandas_series.py
+++ b/python/tests/geopandas/test_match_geopandas_series.py
@@ -435,6 +435,28 @@ class TestMatchGeopandasSeries(TestBase):
def test_union_all(self):
pass
+ def test_intersects(self):
+ for _, geom in self.geoms:
+ for _, geom2 in self.geoms:
+ sgpd_result = GeoSeries(geom).intersects(GeoSeries(geom2))
+ gpd_result =
gpd.GeoSeries(geom).intersects(gpd.GeoSeries(geom2))
+ self.check_pd_series_equal(sgpd_result, gpd_result)
+
+ def test_intersection(self):
+ geometries = [
+ Polygon([(0, 0), (1, 0), (1, 1)]),
+ Polygon([(2, 0), (3, 0), (3, 1)]),
+ Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]),
+ Polygon([(0, 0), (3, 0), (3, 3), (0, 2)]),
+ Polygon([(2, 0), (3, 0), (3, 3), (2, 3)]),
+ Point(0, 0),
+ ]
+ for g1 in geometries:
+ for g2 in geometries:
+ sgpd_result = GeoSeries(g1).intersection(GeoSeries(g2))
+ gpd_result = gpd.GeoSeries(g1).intersection(gpd.GeoSeries(g2))
+ self.check_sgpd_equals_gpd(sgpd_result, gpd_result)
+
def test_intersection_all(self):
pass
@@ -478,6 +500,9 @@ class TestMatchGeopandasSeries(TestBase):
assert isinstance(expected, gpd.GeoSeries)
sgpd_result = actual.to_geopandas()
for a, e in zip(sgpd_result, expected):
+ # Sometimes sedona and geopandas both return empty geometries but
of different types (e.g Point and Polygon)
+ if a.is_empty and e.is_empty:
+ continue
self.assert_geometry_almost_equal(
a, e, tolerance=1e-2
) # increased tolerance from 1e-6