This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf52746ba [SEDONA-720] Refactor GeoPandas Dataframe implementation 
(#1916)
7bf52746ba is described below

commit 7bf52746baff0a5fa1095ff9a162145149f8234d
Author: Feng Zhang <[email protected]>
AuthorDate: Fri Apr 11 20:06:34 2025 -0700

    [SEDONA-720] Refactor GeoPandas Dataframe implementation (#1916)
    
    * [SEDONA-720] Refactor GeoPandas Dataframe implementation
    
    * fix pre-commit error
    
    * fix pyupgrade lint
---
 python/sedona/geopandas/geodataframe.py     | 146 ++++++++++++++--------------
 python/tests/geopandas/test_geodataframe.py |   8 +-
 2 files changed, 77 insertions(+), 77 deletions(-)

diff --git a/python/sedona/geopandas/geodataframe.py 
b/python/sedona/geopandas/geodataframe.py
index e22451f0c5..bc4354e474 100644
--- a/python/sedona/geopandas/geodataframe.py
+++ b/python/sedona/geopandas/geodataframe.py
@@ -30,6 +30,8 @@ from pyspark.pandas._typing import Axis, Dtype, Scalar
 from pyspark.pandas.frame import DataFrame as PandasOnSparkDataFrame
 from pyspark.pandas import Series as PandasOnSparkSeries
 
+from pyspark.pandas.internal import InternalFrame
+
 
 class GeoDataFrame(GeoFrame, pspd.DataFrame):
     """
@@ -136,16 +138,29 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
         self._col_label: Label
 
         from sedona.geopandas import GeoSeries
+        from pyspark.sql import DataFrame as SparkDataFrame
 
-        if isinstance(
-            data, (GeoDataFrame, GeoSeries, PandasOnSparkSeries, 
PandasOnSparkDataFrame)
-        ):
+        if isinstance(data, (GeoDataFrame, GeoSeries)):
             assert dtype is None
             assert not copy
-
             self._anchor = data
             self._col_label = index
+        elif isinstance(data, (PandasOnSparkSeries, PandasOnSparkDataFrame)):
+            assert columns is None
+            assert dtype is None
+            assert not copy
+            if index is None:
+                internal = 
InternalFrame(spark_frame=data._internal.spark_frame)
+                object.__setattr__(self, "_internal_frame", internal)
+        elif isinstance(data, SparkDataFrame):
+            assert columns is None
+            assert dtype is None
+            assert not copy
+            if index is None:
+                internal = InternalFrame(spark_frame=data, 
index_spark_columns=None)
+                object.__setattr__(self, "_internal_frame", internal)
         else:
+            # below are not distributed dataframe types
             if isinstance(data, pd.DataFrame):
                 assert index is None
                 assert dtype is None
@@ -184,6 +199,55 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
         # Implementation of the abstract method
         raise NotImplementedError("This method is not implemented yet.")
 
+    def _process_geometry_columns(
+        self, operation: str, rename_suffix: str = "", *args, **kwargs
+    ) -> GeoDataFrame:
+        """
+        Helper method to process geometry columns with a specified operation.
+
+        Parameters
+        ----------
+        operation : str
+            The spatial operation to apply (e.g., 'ST_Area', 'ST_Buffer').
+        rename_suffix : str, default ""
+            Suffix to append to the resulting column name.
+        args : tuple
+            Positional arguments for the operation.
+        kwargs : dict
+            Keyword arguments for the operation.
+
+        Returns
+        -------
+        GeoDataFrame
+            A new GeoDataFrame with the operation applied to geometry columns.
+        """
+        select_expressions = []
+
+        for field in self._internal.spark_frame.schema.fields:
+            col_name = field.name
+
+            # Skip index and order columns
+            if col_name in ("__index_level_0__", "__natural_order__"):
+                continue
+
+            if field.dataType.typeName() in ("geometrytype", "binary"):
+                # Prepare arguments for the operation
+                positional_params = ", ".join([repr(v) for v in args])
+                keyword_params = ", ".join([repr(v) for v in kwargs.values()])
+                params = ", ".join(filter(None, [positional_params, 
keyword_params]))
+
+                if field.dataType.typeName() == "binary":
+                    expr = f"{operation}(ST_GeomFromWKB(`{col_name}`){', ' + 
params if params else ''}) as {col_name}{rename_suffix}"
+                else:
+                    expr = f"{operation}(`{col_name}`{', ' + params if params 
else ''}) as {col_name}{rename_suffix}"
+                select_expressions.append(expr)
+            else:
+                # Keep non-geometry columns as they are
+                select_expressions.append(f"`{col_name}`")
+
+        sdf = self._internal.spark_frame.selectExpr(*select_expressions)
+        return GeoDataFrame(sdf)
+
     @property
     def dtypes(self) -> gpd.GeoSeries | pd.Series | Dtype:
         # Implementation of the abstract method
@@ -256,42 +320,7 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
         0           1.0      1
         1           4.0      2
         """
-        # Create a list of all column expressions for the new dataframe
-        select_expressions = []
-
-        # Process geometry columns to calculate areas
-        for field in self._internal.spark_frame.schema.fields:
-            col_name = field.name
-
-            # Skip index column to avoid duplication
-            if col_name == "__index_level_0__" or col_name == 
"__natural_order__":
-                continue
-
-            if (
-                field.dataType.typeName() == "geometrytype"
-                or field.dataType.typeName() == "binary"
-            ):
-                # Calculate the area for each geometry column
-                if field.dataType.typeName() == "binary":
-                    area_expr = (
-                        f"ST_Area(ST_GeomFromWKB(`{col_name}`)) as 
{col_name}_area"
-                    )
-                else:
-                    area_expr = f"ST_Area(`{col_name}`) as {col_name}_area"
-                select_expressions.append(area_expr)
-            else:
-                # Keep non-geometry columns as they are
-                select_expressions.append(f"`{col_name}`")
-
-        # Execute the query to get all data in one go
-        result_df = self._internal.spark_frame.selectExpr(*select_expressions)
-
-        # Convert to pandas DataFrame
-        pandas_df = result_df.toPandas()
-
-        # Create a new GeoDataFrame with the result
-        # Note: This avoids the need to manipulate the index columns separately
-        return GeoDataFrame(pandas_df)
+        return self._process_geometry_columns("ST_Area", rename_suffix="_area")
 
     @property
     def crs(self):
@@ -553,40 +582,9 @@ class GeoDataFrame(GeoFrame, pspd.DataFrame):
         >>> gdf = GeoDataFrame(data)
         >>> buffered = gdf.buffer(0.5)
         """
-        # Create a list of all column expressions for the new dataframe
-        select_expressions = []
-
-        # Process each field in the schema
-        for field in self._internal.spark_frame.schema.fields:
-            col_name = field.name
-
-            # Skip index and order columns
-            if col_name == "__index_level_0__" or col_name == 
"__natural_order__":
-                continue
-
-            # Apply buffer to geometry columns
-            if (
-                field.dataType.typeName() == "geometrytype"
-                or field.dataType.typeName() == "binary"
-            ):
-
-                if field.dataType.typeName() == "binary":
-                    # For binary geometry columns (WKB)
-                    buffer_expr = f"ST_Buffer(ST_GeomFromWKB(`{col_name}`), 
{distance}) as {col_name}"
-                else:
-                    # For native geometry columns
-                    buffer_expr = f"ST_Buffer(`{col_name}`, {distance}) as 
{col_name}"
-                select_expressions.append(buffer_expr)
-            else:
-                # Keep non-geometry columns as they are
-                select_expressions.append(f"`{col_name}`")
-
-        # Execute the query to get all data in one go
-        result_df = self._internal.spark_frame.selectExpr(*select_expressions)
-
-        # Convert to pandas DataFrame and create a new GeoDataFrame
-        pandas_df = result_df.toPandas()
-        return GeoDataFrame(pandas_df)
+        return self._process_geometry_columns(
+            "ST_Buffer", rename_suffix="_buffered", distance=distance
+        )
 
     def sjoin(
         self,
diff --git a/python/tests/geopandas/test_geodataframe.py 
b/python/tests/geopandas/test_geodataframe.py
index 30fcf24520..0a4f367727 100644
--- a/python/tests/geopandas/test_geodataframe.py
+++ b/python/tests/geopandas/test_geodataframe.py
@@ -162,17 +162,19 @@ class TestDataframe(TestBase):
         assert type(buffer_df) is GeoDataFrame
 
         # Verify the original columns are preserved
-        assert "geometry1" in buffer_df.columns
+        assert "geometry1_buffered" in buffer_df.columns
         assert "id" in buffer_df.columns
         assert "value" in buffer_df.columns
 
         # Convert to pandas to extract individual geometries
-        pandas_df = 
buffer_df._internal.spark_frame.select("geometry1").toPandas()
+        pandas_df = buffer_df._internal.spark_frame.select(
+            "geometry1_buffered"
+        ).toPandas()
 
         # Calculate areas to verify buffer was applied correctly
         # Point buffer with radius 0.5 should have area approximately π * 0.5² 
≈ 0.785
         # Square buffer with radius 0.5 should expand the 1x1 square to 2x2 
square with rounded corners
-        areas = [geom.area for geom in pandas_df["geometry1"]]
+        areas = [geom.area for geom in pandas_df["geometry1_buffered"]]
 
         # Check that square buffer area is greater than original (1.0)
         assert areas[1] > 1.0

Reply via email to