This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit bbe268832096dbaf2d8ad42cabf8647a5eee9e70
Author: pawelkocinski <[email protected]>
AuthorDate: Tue Nov 11 13:38:52 2025 +0100

    SEDONA-745 Fix osm parser.
---
 sedonaworker/serializer/__init__.py                |   1 +
 sedonaworker/{ => serializer}/serializer.py        |   0
 sedonaworker/worker.py                             |   2 +-
 .../scala/org/apache/sedona/stats/data/_SUCCESS    |   0
 .../org/apache/spark/sql/udf/StrategySuite.scala   |  12 +-
 .../apache/spark/sql/udf/TestScalarPandasUDF.scala | 152 ++++++++++-----------
 6 files changed, 84 insertions(+), 83 deletions(-)

diff --git a/sedonaworker/serializer/__init__.py 
b/sedonaworker/serializer/__init__.py
new file mode 100644
index 0000000000..7a0599eac4
--- /dev/null
+++ b/sedonaworker/serializer/__init__.py
@@ -0,0 +1 @@
+from .serializer import SedonaArrowStreamPandasUDFSerializer
diff --git a/sedonaworker/serializer.py b/sedonaworker/serializer/serializer.py
similarity index 100%
rename from sedonaworker/serializer.py
rename to sedonaworker/serializer/serializer.py
diff --git a/sedonaworker/worker.py b/sedonaworker/worker.py
index 98ee38242b..c98b34b1d8 100644
--- a/sedonaworker/worker.py
+++ b/sedonaworker/worker.py
@@ -24,7 +24,7 @@ import time
 from inspect import currentframe, getframeinfo
 import importlib
 
-from sedonaworker.serializer import SedonaArrowStreamPandasUDFSerializer
+from serializer import SedonaArrowStreamPandasUDFSerializer
 
 has_resource_module = True
 try:
diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/data/_SUCCESS 
b/spark/common/src/test/scala/org/apache/sedona/stats/data/_SUCCESS
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index 77ab4abbb8..48e8e1c878 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.udf
 import org.apache.sedona.spark.SedonaContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.{col, expr}
-import org.apache.spark.sql.udf.ScalarUDF.{geometryToGeometryFunction, 
geometryToNonGeometryFunction, geopandasGeometryToGeometryFunction, 
nonGeometryToGeometryFunction}
+import org.apache.spark.sql.udf.ScalarUDF.{geometryToGeometryFunction}
 import org.locationtech.jts.io.WKTReader
 import org.scalatest.funsuite.AnyFunSuite
 import org.scalatest.matchers.should.Matchers
@@ -45,16 +45,16 @@ class StrategySuite extends AnyFunSuite with Matchers {
 
   test("sedona geospatial UDF") {
 //    spark.sql("select 1").show()
-    val df = spark.read.format("parquet")
-      
.load("/Users/pawelkocinski/Desktop/projects/sedona-book/apache-sedona-book/book/chapter10/data/buildings/partitioned")
+    val df = spark.read.format("geoparquet")
+      
.load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings")
       .select(
-        geometryToNonGeometryFunction(col("geometry")),
+//        geometryToNonGeometryFunction(col("geometry")),
         geometryToGeometryFunction(col("geometry")),
-        nonGeometryToGeometryFunction(expr("ST_AsText(geometry)")),
-        col("geohash")
+//        nonGeometryToGeometryFunction(expr("ST_AsText(geometry)")),
       )
 
     df.show()
+    1 shouldBe 1
 
 //    val df = Seq(
 //      (1, "value", wktReader.read("POINT(21 52)")),
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
index 3006a14e14..2a3c1dbb2b 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
@@ -56,7 +56,7 @@ object ScalarUDF {
 
   private lazy val isPythonAvailable: Boolean = 
TestUtils.testCommandAvailable(pythonExec)
 
-  lazy val pythonVer: String = if (isPythonAvailable) {
+  val pythonVer: String = if (isPythonAvailable) {
     Process(
       Seq(pythonExec, "-c", "import sys; print('%d.%d' % 
sys.version_info[:2])"),
       None,
@@ -73,29 +73,29 @@ object ScalarUDF {
   }
 
   val additionalModule = 
"spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf"
-
-  val geopandasGeometryToNonGeometry: Array[Byte] = {
-    var binaryPandasFunc: Array[Byte] = null
-    withTempPath { path =>
-      Process(
-        Seq(
-          pythonExec,
-          "-c",
-          f"""
-            |from pyspark.sql.types import FloatType
-            |from pyspark.serializers import CloudPickleSerializer
-            |f = open('$path', 'wb');
-            |def apply_geopandas(x):
-            |    return x.area
-            |f.write(CloudPickleSerializer().dumps((apply_geopandas, 
FloatType())))
-            |""".stripMargin),
-        None,
-        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
-      binaryPandasFunc = Files.readAllBytes(path.toPath)
-    }
-    assert(binaryPandasFunc != null)
-    binaryPandasFunc
-  }
+//
+//  val geopandasGeometryToNonGeometry: Array[Byte] = {
+//    var binaryPandasFunc: Array[Byte] = null
+//    withTempPath { path =>
+//      Process(
+//        Seq(
+//          pythonExec,
+//          "-c",
+//          f"""
+//            |from pyspark.sql.types import FloatType
+//            |from pyspark.serializers import CloudPickleSerializer
+//            |f = open('$path', 'wb');
+//            |def apply_geopandas(x):
+//            |    return x.area
+//            |f.write(CloudPickleSerializer().dumps((apply_geopandas, 
FloatType())))
+//            |""".stripMargin),
+//        None,
+//        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+//      binaryPandasFunc = Files.readAllBytes(path.toPath)
+//    }
+//    assert(binaryPandasFunc != null)
+//    binaryPandasFunc
+//  }
 
   val geopandasGeometryToGeometryFunction: Array[Byte] = {
     var binaryPandasFunc: Array[Byte] = null
@@ -119,49 +119,49 @@ object ScalarUDF {
     assert(binaryPandasFunc != null)
     binaryPandasFunc
   }
-
-  val geopandasNonGeometryToGeometryFunction: Array[Byte] = {
-    var binaryPandasFunc: Array[Byte] = null
-    withTempPath { path =>
-      Process(
-        Seq(
-          pythonExec,
-          "-c",
-          f"""
-             |from sedona.sql.types import GeometryType
-             |from shapely.wkt import loads
-             |from pyspark.serializers import CloudPickleSerializer
-             |f = open('$path', 'wb');
-             |def apply_geopandas(x):
-             |    return x.apply(lambda wkt: loads(wkt).buffer(1))
-             |f.write(CloudPickleSerializer().dumps((apply_geopandas, 
GeometryType())))
-             |""".stripMargin),
-        None,
-        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
-      binaryPandasFunc = Files.readAllBytes(path.toPath)
-    }
-    assert(binaryPandasFunc != null)
-    binaryPandasFunc
-  }
+//
+//  val geopandasNonGeometryToGeometryFunction: Array[Byte] = {
+//    var binaryPandasFunc: Array[Byte] = null
+//    withTempPath { path =>
+//      Process(
+//        Seq(
+//          pythonExec,
+//          "-c",
+//          f"""
+//             |from sedona.sql.types import GeometryType
+//             |from shapely.wkt import loads
+//             |from pyspark.serializers import CloudPickleSerializer
+//             |f = open('$path', 'wb');
+//             |def apply_geopandas(x):
+//             |    return x.apply(lambda wkt: loads(wkt).buffer(1))
+//             |f.write(CloudPickleSerializer().dumps((apply_geopandas, 
GeometryType())))
+//             |""".stripMargin),
+//        None,
+//        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+//      binaryPandasFunc = Files.readAllBytes(path.toPath)
+//    }
+//    assert(binaryPandasFunc != null)
+//    binaryPandasFunc
+//  }
 
   private val workerEnv = new java.util.HashMap[String, String]()
     workerEnv.put("PYTHONPATH", s"$pysparkPythonPath:$pythonPath")
     SparkEnv.get.conf.set(PYTHON_WORKER_MODULE, "sedonaworker.worker")
     SparkEnv.get.conf.set(PYTHON_USE_DAEMON, false)
-
-  val geometryToNonGeometryFunction: UserDefinedPythonFunction = 
UserDefinedPythonFunction(
-    name = "geospatial_udf",
-    func = SimplePythonFunction(
-      command = geopandasGeometryToNonGeometry,
-      envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]],
-      pythonIncludes = List.empty[String].asJava,
-      pythonExec = pythonExec,
-      pythonVer = pythonVer,
-      broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
-      accumulator = null),
-    dataType = FloatType,
-    pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF,
-    udfDeterministic = true)
+//
+//  val geometryToNonGeometryFunction: UserDefinedPythonFunction = 
UserDefinedPythonFunction(
+//    name = "geospatial_udf",
+//    func = SimplePythonFunction(
+//      command = geopandasGeometryToNonGeometry,
+//      envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, 
String]],
+//      pythonIncludes = List.empty[String].asJava,
+//      pythonExec = pythonExec,
+//      pythonVer = pythonVer,
+//      broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
+//      accumulator = null),
+//    dataType = FloatType,
+//    pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF,
+//    udfDeterministic = true)
 
   val geometryToGeometryFunction: UserDefinedPythonFunction = 
UserDefinedPythonFunction(
     name = "geospatial_udf",
@@ -176,18 +176,18 @@ object ScalarUDF {
     dataType = GeometryUDT,
     pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF,
     udfDeterministic = true)
-
-  val nonGeometryToGeometryFunction: UserDefinedPythonFunction = 
UserDefinedPythonFunction(
-    name = "geospatial_udf",
-    func = SimplePythonFunction(
-      command = geopandasNonGeometryToGeometryFunction,
-      envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]],
-      pythonIncludes = List.empty[String].asJava,
-      pythonExec = pythonExec,
-      pythonVer = pythonVer,
-      broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
-      accumulator = null),
-    dataType = GeometryUDT,
-    pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF,
-    udfDeterministic = true)
+//
+//  val nonGeometryToGeometryFunction: UserDefinedPythonFunction = 
UserDefinedPythonFunction(
+//    name = "geospatial_udf",
+//    func = SimplePythonFunction(
+//      command = geopandasNonGeometryToGeometryFunction,
+//      envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, 
String]],
+//      pythonIncludes = List.empty[String].asJava,
+//      pythonExec = pythonExec,
+//      pythonVer = pythonVer,
+//      broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
+//      accumulator = null),
+//    dataType = GeometryUDT,
+//    pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF,
+//    udfDeterministic = true)
 }

Reply via email to