This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch add-sedona-worker-daemon-mode in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 1b0a24f844b6247eac0970be964a17a43756997f Author: pawelkocinski <[email protected]> AuthorDate: Wed Jan 14 16:17:48 2026 +0100 add sedonadb sedona udf worker example --- python/pyproject.toml | 49 +++++++++++++--------- python/sedona/spark/utils/geometry_serde.py | 6 ++- python/sedona/spark/worker/daemon.py | 17 -------- python/src/geom_buf.c | 2 - python/src/geomserde.c | 1 - python/src/geomserde_speedup_module.c | 33 ++++++++++++++- python/tests/test_base.py | 2 +- .../org/apache/sedona/spark/SedonaContext.scala | 1 + .../execution/python/SedonaPythonArrowInput.scala | 30 ------------- .../execution/python/SedonaPythonArrowOutput.scala | 5 --- .../org/apache/sedona/sql/SQLSyntaxTestScala.scala | 8 ++-- .../org/apache/sedona/sql/TestBaseScala.scala | 22 ++++------ .../apache/spark/sql/udf/TestScalarPandasUDF.scala | 3 -- 13 files changed, 77 insertions(+), 102 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 1684d78a5f..185d11e7fa 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -26,7 +26,7 @@ description = "Apache Sedona is a cluster computing system for processing large- readme = "README.md" license = { text = "Apache-2.0" } authors = [ { name = "Apache Sedona", email = "[email protected]" } ] -requires-python = ">=3.12" +requires-python = ">=3.8" classifiers = [ "Programming Language :: Python :: 3", "License :: OSI Approved :: Apache Software License", @@ -50,16 +50,38 @@ kepler-map = ["geopandas", "keplergl==0.3.2"] flink = ["apache-flink>=1.19.0"] db = ["sedonadb[geopandas]; python_version >= '3.9'"] all = [ -# "pyspark>=3.4.0,<4.1.0", -# "geopandas", -# "pydeck==0.8.0", -# "keplergl==0.3.2", -# "rasterio>=1.2.10", + "pyspark>=3.4.0,<4.1.0", + "geopandas", + "pydeck==0.8.0", + "keplergl==0.3.2", + "rasterio>=1.2.10", ] [dependency-groups] dev = [ - "pytest>=9.0.2", + "pytest", + "pytest-cov", + "notebook==6.4.12", + "jupyter", + "mkdocs", + "scikit-learn", + "esda", + "libpysal", + "matplotlib", # implicit dependency of esda + # prevent incompatibility with pysal 4.7.0, which is what is resolved to when shapely >2 is specified + "scipy<=1.10.0", + "pandas>=2.0.0", + "numpy<2", + "geopandas", + # https://stackoverflow.com/questions/78949093/how-to-resolve-attributeerror-module-fiona-has-no-attribute-path + # cannot set geopandas>=0.14.4 since it doesn't support python 3.8, so we pin fiona to <1.10.0 + "fiona<1.10.0", + "pyarrow", + "pyspark>=3.4.0,<4.1.0", + "keplergl==0.3.2", + "pydeck==0.8.0", + "pystac==1.5.0", + "rasterio>=1.2.10", ] [project.urls] @@ -80,20 +102,7 @@ exclude = ["*.tests", "*.tests.*", "tests", "tests.*"] name = "sedona.spark.utils.geomserde_speedup" sources = [ "src/geomserde_speedup_module.c", - "src/sedonaserde_vectorized_udf_module.c", "src/geomserde.c", "src/geom_buf.c", "src/geos_c_dyn.c", ] - -[[tool.setuptools.ext-modules]] -name = "sedona.spark.utils.sedonaserde_vectorized_udf_module" -sources = [ - "src/sedonaserde_vectorized_udf_module.c", - "src/geomserde.c", - "src/geom_buf.c", - "src/geos_c_dyn.c", -] - -[tool.uv.sources] -sedonadb = { path = "../../../sedona-db/target/wheels/sedonadb-0.3.0-cp312-cp312-macosx_11_0_arm64.whl" } diff --git a/python/sedona/spark/utils/geometry_serde.py b/python/sedona/spark/utils/geometry_serde.py index 0ef3d4ed5c..103eb49817 100644 --- a/python/sedona/spark/utils/geometry_serde.py +++ b/python/sedona/spark/utils/geometry_serde.py @@ -25,6 +25,9 @@ from shapely.geometry.base import BaseGeometry speedup_enabled = False + +# Use geomserde_speedup when available, otherwise fallback to general pure +# python implementation. try: from . import geomserde_speedup @@ -57,9 +60,8 @@ try: def deserialize(buf: bytearray) -> Optional[BaseGeometry]: if buf is None: return None - return geomserde_speedup.deserialize_2(buf) + return geomserde_speedup.deserialize(buf) - # Export the from_sedona_func for use with numpy ufuncs speedup_enabled = True elif shapely.__version__.startswith("1."): diff --git a/python/sedona/spark/worker/daemon.py b/python/sedona/spark/worker/daemon.py index ce75e376ea..266baf76d5 100644 --- a/python/sedona/spark/worker/daemon.py +++ b/python/sedona/spark/worker/daemon.py @@ -40,23 +40,7 @@ def compute_real_exit_code(exit_code): else: return 1 - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - -file_handler = logging.FileHandler( - "/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/logs/worker_daemon_main.log", - delay=False, -) -file_handler.flush = file_handler.stream.flush - -logger.addHandler(file_handler) - - def worker(sock, authenticated): - logger.info( - "Starting worker process with pid =" + str(os.getpid()) + " socket " + str(sock) - ) """ Called by a worker process after the fork(). """ @@ -207,7 +191,6 @@ def manager(): authenticated = False while True: code = worker(sock, authenticated) - logger.info("Worker exited with code %d", code) if code == 0: authenticated = True if not reuse or code: diff --git a/python/src/geom_buf.c b/python/src/geom_buf.c index d6a51bb3d0..5239de5ae0 100644 --- a/python/src/geom_buf.c +++ b/python/src/geom_buf.c @@ -208,8 +208,6 @@ SedonaErrorCode geom_buf_alloc(GeomBuffer *geom_buf, return SEDONA_SUCCESS; } -#include <stdio.h> - SedonaErrorCode read_geom_buf_header(const char *buf, int buf_size, GeomBuffer *geom_buf, CoordinateSequenceInfo *cs_info, diff --git a/python/src/geomserde.c b/python/src/geomserde.c index 81dafe216f..c1f7427738 100644 --- a/python/src/geomserde.c +++ b/python/src/geomserde.c @@ -718,7 +718,6 @@ static SedonaErrorCode deserialize_geom_buf(GEOSContextHandle_t handle, return SEDONA_SUCCESS; } -#include <stdio.h> SedonaErrorCode sedona_deserialize_geom(GEOSContextHandle_t handle, const char *buf, int buf_size, GEOSGeometry **p_geom, diff --git a/python/src/geomserde_speedup_module.c b/python/src/geomserde_speedup_module.c index 1d7aefcd77..610c4d1b05 100644 --- a/python/src/geomserde_speedup_module.c +++ b/python/src/geomserde_speedup_module.c @@ -287,9 +287,24 @@ static PyObject *to_sedona_func(PyObject *self, PyObject *args) { PyObject *obj = objs[i]; GEOSGeometry *geos_geom = NULL; char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom); + if (!success || geos_geom == NULL) { + PyErr_SetString(PyExc_TypeError, "Invalid GEOS geometry"); + Py_DECREF(out); + return NULL; + } PyObject *serialized = do_serialize(geos_geom); - PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized); + if (!serialized) { + Py_DECREF(out); + return NULL; + } + + if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized) < 0) { + Py_DECREF(serialized); + Py_DECREF(out); + return NULL; + } + Py_DECREF(serialized); } return out; @@ -318,6 +333,8 @@ static PyObject *from_sedona_func(PyObject *self, PyObject *args) { PyObject *obj = objs[i]; if (!PyBytes_Check(obj)) { PyErr_SetString(PyExc_TypeError, "Expected bytes"); + Py_DECREF(out); + return NULL; } @@ -331,11 +348,23 @@ static PyObject *from_sedona_func(PyObject *self, PyObject *args) { sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read); if (err != SEDONA_SUCCESS) { handle_geomserde_error(err); + Py_DECREF(out); return NULL; } + PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle); + if (!pygeom) { + Py_DECREF(out); + return NULL; + } + + if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom) < 0) { + Py_DECREF(pygeom); + Py_DECREF(out); + return NULL; + } - PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom); + Py_DECREF(pygeom); } return out; diff --git a/python/tests/test_base.py b/python/tests/test_base.py index 300d937d27..3974930207 100644 --- a/python/tests/test_base.py +++ b/python/tests/test_base.py @@ -77,7 +77,7 @@ class TestBase: "sedona.spark.worker.daemon", ) .config( - "sedona.python.worker.daemon.enabled", "false" + "sedona.python.worker.daemon.enabled", "true" ) # Pandas on PySpark doesn't work with ANSI mode, which is enabled by default # in Spark 4 .config("spark.sql.ansi.enabled", "false") diff --git a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala index add3caf225..c9e8497f7e 100644 --- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala +++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala @@ -41,6 +41,7 @@ class InternalApi( extends StaticAnnotation object SedonaContext { + private def customOptimizationsWithSession(sparkSession: SparkSession) = Seq( new TransformNestedUDTParquet(sparkSession), diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala index 2544e63a97..6602967351 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala @@ -18,23 +18,6 @@ */ package org.apache.spark.sql.execution.python -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.ArrowStreamWriter import org.apache.spark.sql.catalyst.InternalRow @@ -87,21 +70,8 @@ private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] { writeIteratorToArrowStream(root, writer, dataOut, inputIterator) - // end writes footer to the output stream and doesn't clean any resources. - // It could throw exception if the output stream is closed, so it should be - // in the try block. writer.end() } { - // If we close root and allocator in TaskCompletionListener, there could be a race - // condition where the writer thread keeps writing to the VectorSchemaRoot while - // it's being closed by the TaskCompletion listener. - // Closing root and allocator here is cleaner because root and allocator is owned - // by the writer thread and is only visible to the writer thread. - // - // If the writer thread is interrupted by TaskCompletionListener, it should either - // (1) in the try block, in which case it will get an InterruptedException when - // performing io, and goes into the finally block or (2) in the finally block, - // in which case it will ignore the interruption and close the resources. root.close() allocator.close() } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala index 27764c2a54..8940a376a2 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala @@ -99,11 +99,6 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonR } eos = true } -// def handleEndOfDataSectionSedona(): Unit = { -// if (stream.readInt() == SpecialLengths.END_OF_STREAM) {} -// -// eos = true -// } protected override def handleEndOfDataSection(): Unit = { handleEndOfDataSectionSedona() diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala index 72a27461f6..6f873d0a08 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala @@ -47,11 +47,11 @@ class SQLSyntaxTestScala extends TestBaseScala with TableDrivenPropertyChecks { try { sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY (GEO_COL GEOMETRY)") sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY") should be(true) -// sparkSession.sparkContext.getConf.get(keyParserExtension) should be("true") + sparkSession.sparkContext.getConf.get(keyParserExtension) should be("true") } catch { case ex: Exception => ex.getClass.getName.endsWith("ParseException") should be(true) -// sparkSession.sparkContext.getConf.get(keyParserExtension) should be("false") + sparkSession.sparkContext.getConf.get(keyParserExtension) should be("false") } } @@ -61,11 +61,11 @@ class SQLSyntaxTestScala extends TestBaseScala with TableDrivenPropertyChecks { sparkSession.sql( "CREATE TABLE T_TEST_EXPLICIT_GEOMETRY_2 (INT_COL INT, GEO_COL GEOMETRY)") sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY_2") should be(true) -// sparkSession.sparkContext.getConf.get(keyParserExtension) should be("true") + sparkSession.sparkContext.getConf.get(keyParserExtension) should be("true") } catch { case ex: Exception => ex.getClass.getName.endsWith("ParseException") should be(true) -// sparkSession.sparkContext.getConf.get(keyParserExtension) should be("false") + sparkSession.sparkContext.getConf.get(keyParserExtension) should be("false") } } } diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala index c9b4d6ac28..50d751f484 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala @@ -30,13 +30,13 @@ import java.io.FileInputStream import java.util.concurrent.ThreadLocalRandom trait TestBaseScala extends FunSpec with BeforeAndAfterAll { -// Logger.getRootLogger().setLevel(Level.WARN) -// Logger.getLogger("org.apache").setLevel(Level.WARN) -// Logger.getLogger("com").setLevel(Level.WARN) -// Logger.getLogger("akka").setLevel(Level.WARN) -// Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN) + Logger.getRootLogger().setLevel(Level.WARN) + Logger.getLogger("org.apache").setLevel(Level.WARN) + Logger.getLogger("com").setLevel(Level.WARN) + Logger.getLogger("akka").setLevel(Level.WARN) + Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN) -// val keyParserExtension = "spark.sedona.enableParserExtension" + val keyParserExtension = "spark.sedona.enableParserExtension" val warehouseLocation = System.getProperty("user.dir") + "/target/" val sparkSession = SedonaContext .builder() @@ -49,17 +49,9 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll { .config("sedona.python.worker.udf.module", "sedona.spark.worker.worker") .config("sedona.python.worker.udf.daemon.module", "sedonaworker.daemon") .config("sedona.python.worker.daemon.enabled", "false") -// .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean()) + .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean()) .getOrCreate() -// private val useDaemon: Boolean = -// SparkEnv.get.conf.getBoolean("sedona.python.worker.daemon.enabled", false) -// -// private val sedonaUDFWorkerModule = -// SparkEnv.get.conf.get("sedona.python.worker.udf.module", "sedona.spark.worker.worker") -// -// private val sedonaDaemonModule = -// SparkEnv.get.conf.get("sedona.python.worker.udf.daemon.module", "sedona.spark.worker.daemon") val sparkSessionMinio = SedonaContext .builder() .master("local[*]") diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala index 23aac14bbe..d2c0d71c70 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala @@ -45,9 +45,6 @@ object ScalarUDF { } } - SparkEnv.get.conf.set(PYTHON_USE_DAEMON, false) - SparkEnv.get.conf.set(PYTHON_WORKER_MODULE, "sedonaworker.work") - private[spark] lazy val pythonPath = sys.env.getOrElse("PYTHONPATH", "") protected lazy val sparkHome: String = { sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))
