This is an automated email from the ASF dual-hosted git repository.
imbruced pushed a commit to branch sedona-arrow-udf-example
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/sedona-arrow-udf-example by
this push:
new 340d0f79a4 SEDONA-721 Add docs.
340d0f79a4 is described below
commit 340d0f79a4bcc85f623bc03ec5af82d8e896372a
Author: pawelkocinski <[email protected]>
AuthorDate: Sun Mar 16 17:02:38 2025 +0100
SEDONA-721 Add docs.
---
.github/workflows/java.yml | 1 +
python/sedona/sql/functions.py | 64 +++++++++++++++++++++++++++++
python/sedona/sql/udf.py | 19 ---------
python/tests/utils/test_pandas_arrow_udf.py | 49 ++++++++++++++++++----
4 files changed, 105 insertions(+), 28 deletions(-)
diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index f8e4f6b204..b8705aec9f 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -110,6 +110,7 @@ jobs:
SKIP_TESTS: ${{ matrix.skipTests }}
run: |
SPARK_COMPAT_VERSION=${SPARK_VERSION:0:3}
+ pip freeze
mvn -q clean install -Dspark=${SPARK_COMPAT_VERSION}
-Dscala=${SCALA_VERSION:0:4} -Dspark.version=${SPARK_VERSION} ${SKIP_TESTS}
- run: mkdir staging
- run: cp spark-shaded/target/sedona-*.jar staging
diff --git a/python/sedona/sql/functions.py b/python/sedona/sql/functions.py
new file mode 100644
index 0000000000..c79072cd3c
--- /dev/null
+++ b/python/sedona/sql/functions.py
@@ -0,0 +1,64 @@
+from enum import Enum
+
+import pandas as pd
+
+from sedona.sql.types import GeometryType
+from sedona.utils import geometry_serde
+from pyspark.sql.udf import UserDefinedFunction
+
+SEDONA_SCALAR_EVAL_TYPE = 5200
+SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF"
+
+
+class SedonaUDFType(Enum):
+ SHAPELY_SCALAR = "ShapelyScalar"
+ GEO_SERIES = "GeoSeries"
+
+
+class InvalidSedonaUDFType(Exception):
+ pass
+
+
+sedona_udf_to_eval_type = {
+ SedonaUDFType.SHAPELY_SCALAR: SEDONA_SCALAR_EVAL_TYPE,
+ SedonaUDFType.GEO_SERIES: SEDONA_SCALAR_EVAL_TYPE,
+}
+
+
+def sedona_vectorized_udf(udf_type: SedonaUDFType =
SedonaUDFType.SHAPELY_SCALAR):
+ def apply_fn(fn):
+ if udf_type == SedonaUDFType.SHAPELY_SCALAR:
+ return _apply_shapely_series_udf(fn)
+
+ if udf_type == SedonaUDFType.GEO_SERIES:
+ return _apply_geo_series_udf(fn)
+
+ raise InvalidSedonaUDFType(f"Invalid UDF type: {udf_type}")
+
+ return apply_fn
+
+
+def _apply_shapely_series_udf(fn):
+ def apply(series: pd.Series) -> pd.Series:
+ applied = series.apply(lambda x: fn(geometry_serde.deserialize(x)[0]))
+
+ return applied.apply(lambda x: geometry_serde.serialize(x))
+
+ return UserDefinedFunction(
+ apply, GeometryType(), "SedonaPandasArrowUDF",
evalType=SEDONA_SCALAR_EVAL_TYPE
+ )
+
+
+def _apply_geo_series_udf(fn):
+ import geopandas as gpd
+
+ def apply(series: pd.Series) -> pd.Series:
+ geo_series = gpd.GeoSeries(
+ series.apply(lambda x: geometry_serde.deserialize(x)[0])
+ )
+
+ return fn(geo_series).apply(lambda x: geometry_serde.serialize(x))
+
+ return UserDefinedFunction(
+ apply, GeometryType(), "SedonaPandasArrowUDF",
evalType=SEDONA_SCALAR_EVAL_TYPE
+ )
diff --git a/python/sedona/sql/udf.py b/python/sedona/sql/udf.py
deleted file mode 100644
index 6e723d0e10..0000000000
--- a/python/sedona/sql/udf.py
+++ /dev/null
@@ -1,19 +0,0 @@
-import pandas as pd
-
-from sedona.sql.types import GeometryType
-from sedona.utils import geometry_serde
-from pyspark.sql.udf import UserDefinedFunction
-
-
-SEDONA_SCALAR_EVAL_TYPE = 5200
-
-
-def sedona_vectorized_udf(fn):
- def apply(series: pd.Series) -> pd.Series:
- geo_series = series.apply(lambda x:
fn(geometry_serde.deserialize(x)[0]))
-
- return geo_series.apply(lambda x: geometry_serde.serialize(x))
-
- return UserDefinedFunction(
- apply, GeometryType(), "SedonaPandasArrowUDF",
evalType=SEDONA_SCALAR_EVAL_TYPE
- )
diff --git a/python/tests/utils/test_pandas_arrow_udf.py
b/python/tests/utils/test_pandas_arrow_udf.py
index 7d59a93283..4147c65778 100644
--- a/python/tests/utils/test_pandas_arrow_udf.py
+++ b/python/tests/utils/test_pandas_arrow_udf.py
@@ -1,19 +1,27 @@
from sedona.sql.types import GeometryType
-from sedona.sql.udf import sedona_vectorized_udf
+from sedona.sql.functions import sedona_vectorized_udf, SedonaUDFType
from tests import chicago_crimes_input_location
from tests.test_base import TestBase
import pyspark.sql.functions as f
import shapely.geometry.base as b
from time import time
+import geopandas as gpd
def non_vectorized_buffer_udf(geom: b.BaseGeometry) -> b.BaseGeometry:
- return geom.buffer(0.001)
+ return geom.buffer(0.1)
-@sedona_vectorized_udf
+@sedona_vectorized_udf()
def vectorized_buffer(geom: b.BaseGeometry) -> b.BaseGeometry:
- return geom.buffer(0.001)
+ return geom.buffer(0.1)
+
+
+@sedona_vectorized_udf(udf_type=SedonaUDFType.GEO_SERIES)
+def vectorized_geo_series_buffer(series: gpd.GeoSeries) -> gpd.GeoSeries:
+ buffered = series.buffer(0.1)
+
+ return buffered
buffer_distanced_udf = f.udf(non_vectorized_buffer_udf, GeometryType())
@@ -21,6 +29,13 @@ buffer_distanced_udf = f.udf(non_vectorized_buffer_udf,
GeometryType())
class TestSedonaArrowUDF(TestBase):
+ def get_area(self, df, udf_fn):
+ return (
+ df.select(udf_fn(f.col("geom")).alias("buffer"))
+ .selectExpr("SUM(ST_Area(buffer))")
+ .collect()[0][0]
+ )
+
def test_pandas_arrow_udf(self):
df = (
self.spark.read.option("header", "true")
@@ -32,15 +47,31 @@ class TestSedonaArrowUDF(TestBase):
vectorized_times = []
non_vectorized_times = []
- for i in range(10):
+ for i in range(5):
start = time()
- df = df.withColumn("buffer", vectorized_buffer(f.col("geom")))
- df.count()
+ area1 = self.get_area(df, vectorized_buffer)
+
+ assert area1 > 478
+
vectorized_times.append(time() - start)
- df = df.withColumn("buffer", buffer_distanced_udf(f.col("geom")))
- df.count()
+ area2 = self.get_area(df, buffer_distanced_udf)
+
+ assert area2 > 478
+
non_vectorized_times.append(time() - start)
for v, nv in zip(vectorized_times, non_vectorized_times):
assert v < nv, "Vectorized UDF is slower than non-vectorized UDF"
+
+ def test_geo_series_udf(self):
+ df = (
+ self.spark.read.option("header", "true")
+ .format("csv")
+ .load(chicago_crimes_input_location)
+ .selectExpr("ST_Point(y, x) AS geom")
+ )
+
+ area = self.get_area(df, vectorized_buffer)
+
+ assert area > 478