This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch arrow-worker in repository https://gitbox.apache.org/repos/asf/sedona.git
commit bbe268832096dbaf2d8ad42cabf8647a5eee9e70 Author: pawelkocinski <[email protected]> AuthorDate: Tue Nov 11 13:38:52 2025 +0100 SEDONA-745 Fix osm parser. --- sedonaworker/serializer/__init__.py | 1 + sedonaworker/{ => serializer}/serializer.py | 0 sedonaworker/worker.py | 2 +- .../scala/org/apache/sedona/stats/data/_SUCCESS | 0 .../org/apache/spark/sql/udf/StrategySuite.scala | 12 +- .../apache/spark/sql/udf/TestScalarPandasUDF.scala | 152 ++++++++++----------- 6 files changed, 84 insertions(+), 83 deletions(-) diff --git a/sedonaworker/serializer/__init__.py b/sedonaworker/serializer/__init__.py new file mode 100644 index 0000000000..7a0599eac4 --- /dev/null +++ b/sedonaworker/serializer/__init__.py @@ -0,0 +1 @@ +from .serializer import SedonaArrowStreamPandasUDFSerializer diff --git a/sedonaworker/serializer.py b/sedonaworker/serializer/serializer.py similarity index 100% rename from sedonaworker/serializer.py rename to sedonaworker/serializer/serializer.py diff --git a/sedonaworker/worker.py b/sedonaworker/worker.py index 98ee38242b..c98b34b1d8 100644 --- a/sedonaworker/worker.py +++ b/sedonaworker/worker.py @@ -24,7 +24,7 @@ import time from inspect import currentframe, getframeinfo import importlib -from sedonaworker.serializer import SedonaArrowStreamPandasUDFSerializer +from serializer import SedonaArrowStreamPandasUDFSerializer has_resource_module = True try: diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/data/_SUCCESS b/spark/common/src/test/scala/org/apache/sedona/stats/data/_SUCCESS new file mode 100644 index 0000000000..e69de29bb2 diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala index 77ab4abbb8..48e8e1c878 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.udf import org.apache.sedona.spark.SedonaContext import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, expr} -import org.apache.spark.sql.udf.ScalarUDF.{geometryToGeometryFunction, geometryToNonGeometryFunction, geopandasGeometryToGeometryFunction, nonGeometryToGeometryFunction} +import org.apache.spark.sql.udf.ScalarUDF.{geometryToGeometryFunction} import org.locationtech.jts.io.WKTReader import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers @@ -45,16 +45,16 @@ class StrategySuite extends AnyFunSuite with Matchers { test("sedona geospatial UDF") { // spark.sql("select 1").show() - val df = spark.read.format("parquet") - .load("/Users/pawelkocinski/Desktop/projects/sedona-book/apache-sedona-book/book/chapter10/data/buildings/partitioned") + val df = spark.read.format("geoparquet") + .load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings") .select( - geometryToNonGeometryFunction(col("geometry")), +// geometryToNonGeometryFunction(col("geometry")), geometryToGeometryFunction(col("geometry")), - nonGeometryToGeometryFunction(expr("ST_AsText(geometry)")), - col("geohash") +// nonGeometryToGeometryFunction(expr("ST_AsText(geometry)")), ) df.show() + 1 shouldBe 1 // val df = Seq( // (1, "value", wktReader.read("POINT(21 52)")), diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala index 3006a14e14..2a3c1dbb2b 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala @@ -56,7 +56,7 @@ object ScalarUDF { private lazy val isPythonAvailable: Boolean = TestUtils.testCommandAvailable(pythonExec) - lazy val pythonVer: String = if (isPythonAvailable) { + val pythonVer: String = if (isPythonAvailable) { Process( Seq(pythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"), None, @@ -73,29 +73,29 @@ object ScalarUDF { } val additionalModule = "spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf" - - val geopandasGeometryToNonGeometry: Array[Byte] = { - var binaryPandasFunc: Array[Byte] = null - withTempPath { path => - Process( - Seq( - pythonExec, - "-c", - f""" - |from pyspark.sql.types import FloatType - |from pyspark.serializers import CloudPickleSerializer - |f = open('$path', 'wb'); - |def apply_geopandas(x): - | return x.area - |f.write(CloudPickleSerializer().dumps((apply_geopandas, FloatType()))) - |""".stripMargin), - None, - "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! - binaryPandasFunc = Files.readAllBytes(path.toPath) - } - assert(binaryPandasFunc != null) - binaryPandasFunc - } +// +// val geopandasGeometryToNonGeometry: Array[Byte] = { +// var binaryPandasFunc: Array[Byte] = null +// withTempPath { path => +// Process( +// Seq( +// pythonExec, +// "-c", +// f""" +// |from pyspark.sql.types import FloatType +// |from pyspark.serializers import CloudPickleSerializer +// |f = open('$path', 'wb'); +// |def apply_geopandas(x): +// | return x.area +// |f.write(CloudPickleSerializer().dumps((apply_geopandas, FloatType()))) +// |""".stripMargin), +// None, +// "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! +// binaryPandasFunc = Files.readAllBytes(path.toPath) +// } +// assert(binaryPandasFunc != null) +// binaryPandasFunc +// } val geopandasGeometryToGeometryFunction: Array[Byte] = { var binaryPandasFunc: Array[Byte] = null @@ -119,49 +119,49 @@ object ScalarUDF { assert(binaryPandasFunc != null) binaryPandasFunc } - - val geopandasNonGeometryToGeometryFunction: Array[Byte] = { - var binaryPandasFunc: Array[Byte] = null - withTempPath { path => - Process( - Seq( - pythonExec, - "-c", - f""" - |from sedona.sql.types import GeometryType - |from shapely.wkt import loads - |from pyspark.serializers import CloudPickleSerializer - |f = open('$path', 'wb'); - |def apply_geopandas(x): - | return x.apply(lambda wkt: loads(wkt).buffer(1)) - |f.write(CloudPickleSerializer().dumps((apply_geopandas, GeometryType()))) - |""".stripMargin), - None, - "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! - binaryPandasFunc = Files.readAllBytes(path.toPath) - } - assert(binaryPandasFunc != null) - binaryPandasFunc - } +// +// val geopandasNonGeometryToGeometryFunction: Array[Byte] = { +// var binaryPandasFunc: Array[Byte] = null +// withTempPath { path => +// Process( +// Seq( +// pythonExec, +// "-c", +// f""" +// |from sedona.sql.types import GeometryType +// |from shapely.wkt import loads +// |from pyspark.serializers import CloudPickleSerializer +// |f = open('$path', 'wb'); +// |def apply_geopandas(x): +// | return x.apply(lambda wkt: loads(wkt).buffer(1)) +// |f.write(CloudPickleSerializer().dumps((apply_geopandas, GeometryType()))) +// |""".stripMargin), +// None, +// "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!! +// binaryPandasFunc = Files.readAllBytes(path.toPath) +// } +// assert(binaryPandasFunc != null) +// binaryPandasFunc +// } private val workerEnv = new java.util.HashMap[String, String]() workerEnv.put("PYTHONPATH", s"$pysparkPythonPath:$pythonPath") SparkEnv.get.conf.set(PYTHON_WORKER_MODULE, "sedonaworker.worker") SparkEnv.get.conf.set(PYTHON_USE_DAEMON, false) - - val geometryToNonGeometryFunction: UserDefinedPythonFunction = UserDefinedPythonFunction( - name = "geospatial_udf", - func = SimplePythonFunction( - command = geopandasGeometryToNonGeometry, - envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], - pythonIncludes = List.empty[String].asJava, - pythonExec = pythonExec, - pythonVer = pythonVer, - broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, - accumulator = null), - dataType = FloatType, - pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF, - udfDeterministic = true) +// +// val geometryToNonGeometryFunction: UserDefinedPythonFunction = UserDefinedPythonFunction( +// name = "geospatial_udf", +// func = SimplePythonFunction( +// command = geopandasGeometryToNonGeometry, +// envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], +// pythonIncludes = List.empty[String].asJava, +// pythonExec = pythonExec, +// pythonVer = pythonVer, +// broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, +// accumulator = null), +// dataType = FloatType, +// pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF, +// udfDeterministic = true) val geometryToGeometryFunction: UserDefinedPythonFunction = UserDefinedPythonFunction( name = "geospatial_udf", @@ -176,18 +176,18 @@ object ScalarUDF { dataType = GeometryUDT, pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF, udfDeterministic = true) - - val nonGeometryToGeometryFunction: UserDefinedPythonFunction = UserDefinedPythonFunction( - name = "geospatial_udf", - func = SimplePythonFunction( - command = geopandasNonGeometryToGeometryFunction, - envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], - pythonIncludes = List.empty[String].asJava, - pythonExec = pythonExec, - pythonVer = pythonVer, - broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, - accumulator = null), - dataType = GeometryUDT, - pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF, - udfDeterministic = true) +// +// val nonGeometryToGeometryFunction: UserDefinedPythonFunction = UserDefinedPythonFunction( +// name = "geospatial_udf", +// func = SimplePythonFunction( +// command = geopandasNonGeometryToGeometryFunction, +// envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]], +// pythonIncludes = List.empty[String].asJava, +// pythonExec = pythonExec, +// pythonVer = pythonVer, +// broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava, +// accumulator = null), +// dataType = GeometryUDT, +// pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF, +// udfDeterministic = true) }
