This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf52746ba [SEDONA-720] Refactor GeoPandas Dataframe implementation
(#1916)
7bf52746ba is described below
commit 7bf52746baff0a5fa1095ff9a162145149f8234d
Author: Feng Zhang <[email protected]>
AuthorDate: Fri Apr 11 20:06:34 2025 -0700
[SEDONA-720] Refactor GeoPandas Dataframe implementation (#1916)
* [SEDONA-720] Refactor GeoPandas Dataframe implementation
* fix pre-commit error
* fix pyupgrade lint
---
python/sedona/geopandas/geodataframe.py | 146 ++++++++++++++--------------
python/tests/geopandas/test_geodataframe.py | 8 +-
2 files changed, 77 insertions(+), 77 deletions(-)
diff --git a/python/sedona/geopandas/geodataframe.py
b/python/sedona/geopandas/geodataframe.py
index e22451f0c5..bc4354e474 100644
--- a/python/sedona/geopandas/geodataframe.py
+++ b/python/sedona/geopandas/geodataframe.py
@@ -30,6 +30,8 @@ from pyspark.pandas._typing import Axis, Dtype, Scalar
from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
from pyspark.pandas import Series as PandasOnSparkSeries
+from pyspark.pandas.internal import InternalFrame
+
class GeoDataFrame(GeoFrame, pspd.DataFrame):
"""
@@ -136,16 +138,29 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
self._col_label: Label
from sedona.geopandas import GeoSeries
+ from pyspark.sql import DataFrame as SparkDataFrame
- if isinstance(
- data, (GeoDataFrame, GeoSeries, PandasOnSparkSeries,
PandasOnSparkDataFrame)
- ):
+ if isinstance(data, (GeoDataFrame, GeoSeries)):
assert dtype is None
assert not copy
-
self._anchor = data
self._col_label = index
+ elif isinstance(data, (PandasOnSparkSeries, PandasOnSparkDataFrame)):
+ assert columns is None
+ assert dtype is None
+ assert not copy
+ if index is None:
+ internal =
InternalFrame(spark_frame=data._internal.spark_frame)
+ object.__setattr__(self, "_internal_frame", internal)
+ elif isinstance(data, SparkDataFrame):
+ assert columns is None
+ assert dtype is None
+ assert not copy
+ if index is None:
+ internal = InternalFrame(spark_frame=data,
index_spark_columns=None)
+ object.__setattr__(self, "_internal_frame", internal)
else:
+ # below are not distributed dataframe types
if isinstance(data, pd.DataFrame):
assert index is None
assert dtype is None
@@ -184,6 +199,55 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
# Implementation of the abstract method
raise NotImplementedError("This method is not implemented yet.")
+ def _process_geometry_columns(
+ self, operation: str, rename_suffix: str = "", *args, **kwargs
+ ) -> GeoDataFrame:
+ """
+ Helper method to process geometry columns with a specified operation.
+
+ Parameters
+ ----------
+ operation : str
+ The spatial operation to apply (e.g., 'ST_Area', 'ST_Buffer').
+ rename_suffix : str, default ""
+ Suffix to append to the resulting column name.
+ args : tuple
+ Positional arguments for the operation.
+ kwargs : dict
+ Keyword arguments for the operation.
+
+ Returns
+ -------
+ GeoDataFrame
+ A new GeoDataFrame with the operation applied to geometry columns.
+ """
+ select_expressions = []
+
+ for field in self._internal.spark_frame.schema.fields:
+ col_name = field.name
+
+ # Skip index and order columns
+ if col_name in ("__index_level_0__", "__natural_order__"):
+ continue
+
+ if field.dataType.typeName() in ("geometrytype", "binary"):
+ # Prepare arguments for the operation
+ positional_params = ", ".join([repr(v) for v in args])
+ keyword_params = ", ".join([repr(v) for v in kwargs.values()])
+ params = ", ".join(filter(None, [positional_params,
keyword_params]))
+
+ if field.dataType.typeName() == "binary":
+ expr = f"{operation}(ST_GeomFromWKB(`{col_name}`){', ' +
params if params else ''}) as {col_name}{rename_suffix}"
+ else:
+ expr = f"{operation}(`{col_name}`{', ' + params if params
else ''}) as {col_name}{rename_suffix}"
+ select_expressions.append(expr)
+ else:
+ # Keep non-geometry columns as they are
+ select_expressions.append(f"`{col_name}`")
+
+ sdf = self._internal.spark_frame.selectExpr(*select_expressions)
+ return GeoDataFrame(sdf)
+
@property
def dtypes(self) -> gpd.GeoSeries | pd.Series | Dtype:
# Implementation of the abstract method
@@ -256,42 +320,7 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
0 1.0 1
1 4.0 2
"""
- # Create a list of all column expressions for the new dataframe
- select_expressions = []
-
- # Process geometry columns to calculate areas
- for field in self._internal.spark_frame.schema.fields:
- col_name = field.name
-
- # Skip index column to avoid duplication
- if col_name == "__index_level_0__" or col_name ==
"__natural_order__":
- continue
-
- if (
- field.dataType.typeName() == "geometrytype"
- or field.dataType.typeName() == "binary"
- ):
- # Calculate the area for each geometry column
- if field.dataType.typeName() == "binary":
- area_expr = (
- f"ST_Area(ST_GeomFromWKB(`{col_name}`)) as
{col_name}_area"
- )
- else:
- area_expr = f"ST_Area(`{col_name}`) as {col_name}_area"
- select_expressions.append(area_expr)
- else:
- # Keep non-geometry columns as they are
- select_expressions.append(f"`{col_name}`")
-
- # Execute the query to get all data in one go
- result_df = self._internal.spark_frame.selectExpr(*select_expressions)
-
- # Convert to pandas DataFrame
- pandas_df = result_df.toPandas()
-
- # Create a new GeoDataFrame with the result
- # Note: This avoids the need to manipulate the index columns separately
- return GeoDataFrame(pandas_df)
+ return self._process_geometry_columns("ST_Area", rename_suffix="_area")
@property
def crs(self):
@@ -553,40 +582,9 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
>>> gdf = GeoDataFrame(data)
>>> buffered = gdf.buffer(0.5)
"""
- # Create a list of all column expressions for the new dataframe
- select_expressions = []
-
- # Process each field in the schema
- for field in self._internal.spark_frame.schema.fields:
- col_name = field.name
-
- # Skip index and order columns
- if col_name == "__index_level_0__" or col_name ==
"__natural_order__":
- continue
-
- # Apply buffer to geometry columns
- if (
- field.dataType.typeName() == "geometrytype"
- or field.dataType.typeName() == "binary"
- ):
-
- if field.dataType.typeName() == "binary":
- # For binary geometry columns (WKB)
- buffer_expr = f"ST_Buffer(ST_GeomFromWKB(`{col_name}`),
{distance}) as {col_name}"
- else:
- # For native geometry columns
- buffer_expr = f"ST_Buffer(`{col_name}`, {distance}) as
{col_name}"
- select_expressions.append(buffer_expr)
- else:
- # Keep non-geometry columns as they are
- select_expressions.append(f"`{col_name}`")
-
- # Execute the query to get all data in one go
- result_df = self._internal.spark_frame.selectExpr(*select_expressions)
-
- # Convert to pandas DataFrame and create a new GeoDataFrame
- pandas_df = result_df.toPandas()
- return GeoDataFrame(pandas_df)
+ return self._process_geometry_columns(
+ "ST_Buffer", rename_suffix="_buffered", distance=distance
+ )
def sjoin(
self,
diff --git a/python/tests/geopandas/test_geodataframe.py
b/python/tests/geopandas/test_geodataframe.py
index 30fcf24520..0a4f367727 100644
--- a/python/tests/geopandas/test_geodataframe.py
+++ b/python/tests/geopandas/test_geodataframe.py
@@ -162,17 +162,19 @@ class TestDataframe(TestBase):
assert type(buffer_df) is GeoDataFrame
# Verify the original columns are preserved
- assert "geometry1" in buffer_df.columns
+ assert "geometry1_buffered" in buffer_df.columns
assert "id" in buffer_df.columns
assert "value" in buffer_df.columns
# Convert to pandas to extract individual geometries
- pandas_df =
buffer_df._internal.spark_frame.select("geometry1").toPandas()
+ pandas_df = buffer_df._internal.spark_frame.select(
+ "geometry1_buffered"
+ ).toPandas()
# Calculate areas to verify buffer was applied correctly
# Point buffer with radius 0.5 should have area approximately π * 0.5²
≈ 0.785
# Square buffer with radius 0.5 should expand the 1x1 square to 2x2
square with rounded corners
- areas = [geom.area for geom in pandas_df["geometry1"]]
+ areas = [geom.area for geom in pandas_df["geometry1_buffered"]]
# Check that square buffer area is greater than original (1.0)
assert areas[1] > 1.0