This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch sedona-arrow-udf-example
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/sedona-arrow-udf-example by 
this push:
     new 340d0f79a4 SEDONA-721 Add docs.
340d0f79a4 is described below

commit 340d0f79a4bcc85f623bc03ec5af82d8e896372a
Author: pawelkocinski <[email protected]>
AuthorDate: Sun Mar 16 17:02:38 2025 +0100

    SEDONA-721 Add docs.
---
 .github/workflows/java.yml                  |  1 +
 python/sedona/sql/functions.py              | 64 +++++++++++++++++++++++++++++
 python/sedona/sql/udf.py                    | 19 ---------
 python/tests/utils/test_pandas_arrow_udf.py | 49 ++++++++++++++++++----
 4 files changed, 105 insertions(+), 28 deletions(-)

diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index f8e4f6b204..b8705aec9f 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -110,6 +110,7 @@ jobs:
           SKIP_TESTS: ${{ matrix.skipTests }}
         run: |
           SPARK_COMPAT_VERSION=${SPARK_VERSION:0:3}
+          pip freeze
           mvn -q clean install -Dspark=${SPARK_COMPAT_VERSION} 
-Dscala=${SCALA_VERSION:0:4} -Dspark.version=${SPARK_VERSION} ${SKIP_TESTS}
       - run: mkdir staging
       - run: cp spark-shaded/target/sedona-*.jar staging
diff --git a/python/sedona/sql/functions.py b/python/sedona/sql/functions.py
new file mode 100644
index 0000000000..c79072cd3c
--- /dev/null
+++ b/python/sedona/sql/functions.py
@@ -0,0 +1,64 @@
+from enum import Enum
+
+import pandas as pd
+
+from sedona.sql.types import GeometryType
+from sedona.utils import geometry_serde
+from pyspark.sql.udf import UserDefinedFunction
+
+SEDONA_SCALAR_EVAL_TYPE = 5200
+SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF"
+
+
+class SedonaUDFType(Enum):
+    SHAPELY_SCALAR = "ShapelyScalar"
+    GEO_SERIES = "GeoSeries"
+
+
+class InvalidSedonaUDFType(Exception):
+    pass
+
+
+sedona_udf_to_eval_type = {
+    SedonaUDFType.SHAPELY_SCALAR: SEDONA_SCALAR_EVAL_TYPE,
+    SedonaUDFType.GEO_SERIES: SEDONA_SCALAR_EVAL_TYPE,
+}
+
+
+def sedona_vectorized_udf(udf_type: SedonaUDFType = 
SedonaUDFType.SHAPELY_SCALAR):
+    def apply_fn(fn):
+        if udf_type == SedonaUDFType.SHAPELY_SCALAR:
+            return _apply_shapely_series_udf(fn)
+
+        if udf_type == SedonaUDFType.GEO_SERIES:
+            return _apply_geo_series_udf(fn)
+
+        raise InvalidSedonaUDFType(f"Invalid UDF type: {udf_type}")
+
+    return apply_fn
+
+
+def _apply_shapely_series_udf(fn):
+    def apply(series: pd.Series) -> pd.Series:
+        applied = series.apply(lambda x: fn(geometry_serde.deserialize(x)[0]))
+
+        return applied.apply(lambda x: geometry_serde.serialize(x))
+
+    return UserDefinedFunction(
+        apply, GeometryType(), "SedonaPandasArrowUDF", 
evalType=SEDONA_SCALAR_EVAL_TYPE
+    )
+
+
+def _apply_geo_series_udf(fn):
+    import geopandas as gpd
+
+    def apply(series: pd.Series) -> pd.Series:
+        geo_series = gpd.GeoSeries(
+            series.apply(lambda x: geometry_serde.deserialize(x)[0])
+        )
+
+        return fn(geo_series).apply(lambda x: geometry_serde.serialize(x))
+
+    return UserDefinedFunction(
+        apply, GeometryType(), "SedonaPandasArrowUDF", 
evalType=SEDONA_SCALAR_EVAL_TYPE
+    )
diff --git a/python/sedona/sql/udf.py b/python/sedona/sql/udf.py
deleted file mode 100644
index 6e723d0e10..0000000000
--- a/python/sedona/sql/udf.py
+++ /dev/null
@@ -1,19 +0,0 @@
-import pandas as pd
-
-from sedona.sql.types import GeometryType
-from sedona.utils import geometry_serde
-from pyspark.sql.udf import UserDefinedFunction
-
-
-SEDONA_SCALAR_EVAL_TYPE = 5200
-
-
-def sedona_vectorized_udf(fn):
-    def apply(series: pd.Series) -> pd.Series:
-        geo_series = series.apply(lambda x: 
fn(geometry_serde.deserialize(x)[0]))
-
-        return geo_series.apply(lambda x: geometry_serde.serialize(x))
-
-    return UserDefinedFunction(
-        apply, GeometryType(), "SedonaPandasArrowUDF", 
evalType=SEDONA_SCALAR_EVAL_TYPE
-    )
diff --git a/python/tests/utils/test_pandas_arrow_udf.py 
b/python/tests/utils/test_pandas_arrow_udf.py
index 7d59a93283..4147c65778 100644
--- a/python/tests/utils/test_pandas_arrow_udf.py
+++ b/python/tests/utils/test_pandas_arrow_udf.py
@@ -1,19 +1,27 @@
 from sedona.sql.types import GeometryType
-from sedona.sql.udf import sedona_vectorized_udf
+from sedona.sql.functions import sedona_vectorized_udf, SedonaUDFType
 from tests import chicago_crimes_input_location
 from tests.test_base import TestBase
 import pyspark.sql.functions as f
 import shapely.geometry.base as b
 from time import time
+import geopandas as gpd
 
 
 def non_vectorized_buffer_udf(geom: b.BaseGeometry) -> b.BaseGeometry:
-    return geom.buffer(0.001)
+    return geom.buffer(0.1)
 
 
-@sedona_vectorized_udf
+@sedona_vectorized_udf()
 def vectorized_buffer(geom: b.BaseGeometry) -> b.BaseGeometry:
-    return geom.buffer(0.001)
+    return geom.buffer(0.1)
+
+
+@sedona_vectorized_udf(udf_type=SedonaUDFType.GEO_SERIES)
+def vectorized_geo_series_buffer(series: gpd.GeoSeries) -> gpd.GeoSeries:
+    buffered = series.buffer(0.1)
+
+    return buffered
 
 
 buffer_distanced_udf = f.udf(non_vectorized_buffer_udf, GeometryType())
@@ -21,6 +29,13 @@ buffer_distanced_udf = f.udf(non_vectorized_buffer_udf, 
GeometryType())
 
 class TestSedonaArrowUDF(TestBase):
 
+    def get_area(self, df, udf_fn):
+        return (
+            df.select(udf_fn(f.col("geom")).alias("buffer"))
+            .selectExpr("SUM(ST_Area(buffer))")
+            .collect()[0][0]
+        )
+
     def test_pandas_arrow_udf(self):
         df = (
             self.spark.read.option("header", "true")
@@ -32,15 +47,31 @@ class TestSedonaArrowUDF(TestBase):
         vectorized_times = []
         non_vectorized_times = []
 
-        for i in range(10):
+        for i in range(5):
             start = time()
-            df = df.withColumn("buffer", vectorized_buffer(f.col("geom")))
-            df.count()
+            area1 = self.get_area(df, vectorized_buffer)
+
+            assert area1 > 478
+
             vectorized_times.append(time() - start)
 
-            df = df.withColumn("buffer", buffer_distanced_udf(f.col("geom")))
-            df.count()
+            area2 = self.get_area(df, buffer_distanced_udf)
+
+            assert area2 > 478
+
             non_vectorized_times.append(time() - start)
 
         for v, nv in zip(vectorized_times, non_vectorized_times):
             assert v < nv, "Vectorized UDF is slower than non-vectorized UDF"
+
+    def test_geo_series_udf(self):
+        df = (
+            self.spark.read.option("header", "true")
+            .format("csv")
+            .load(chicago_crimes_input_location)
+            .selectExpr("ST_Point(y, x) AS geom")
+        )
+
+        area = self.get_area(df, vectorized_buffer)
+
+        assert area > 478

Reply via email to