This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch add-sedona-worker-daemon-mode in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 188205ac34fec03df1777261a497aae8062de05f Author: pawelkocinski <[email protected]> AuthorDate: Tue Jan 13 00:25:30 2026 +0100 add sedonadb sedona udf worker example --- python/pyproject.toml | 14 +- python/sedona/spark/sql/functions.py | 22 +- python/sedona/spark/utils/geometry_serde.py | 6 +- python/sedona/spark/utils/udf.py | 26 +++ python/sedona/spark/worker/daemon.py | 227 +++++++++++++++++++++ python/sedona/spark/worker/serde.py | 6 +- python/sedona/spark/worker/udf_info.py | 9 +- python/sedona/spark/worker/worker.py | 18 +- python/setup.py | 6 + python/src/geom_buf.c | 2 + python/src/geomserde.c | 1 + python/src/geomserde_speedup_module.c | 99 ++++++++- python/tests/test_base.py | 2 + .../tests/utils/test_sedona_db_vectorized_udf.py | 132 +++++++++++- .../org/apache/sedona/sql/UDF/PythonEvalType.scala | 4 +- .../execution/python/SedonaArrowPythonRunner.scala | 6 +- .../sql/execution/python/SedonaArrowStrategy.scala | 10 +- .../execution/python/SedonaBasePythonRunner.scala | 12 +- .../execution/python/SedonaDBWorkerFactory.scala | 14 +- .../execution/python/SedonaPythonArrowInput.scala | 3 + .../execution/python/SedonaPythonArrowOutput.scala | 3 +- .../spark/sql/execution/python/WorkerContext.scala | 16 +- .../spark/sql/udf/ExtractSedonaUDFRule.scala | 3 +- .../org/apache/sedona/sql/TestBaseScala.scala | 4 +- .../org/apache/spark/sql/udf/StrategySuite.scala | 59 +++--- 25 files changed, 602 insertions(+), 102 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 29c7d0c388..1684d78a5f 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -16,7 +16,7 @@ # under the License. [build-system] -requires = ["setuptools>=80.9.0", "wheel"] +requires = ["setuptools>=80.9.0", "wheel", "numpy"] build-backend = "setuptools.build_meta" [project] @@ -36,6 +36,7 @@ dependencies = [ "geoarrow-c>=0.3.1", "geoarrow-pyarrow>=0.2.0", "geopandas>=1.1.2", + "numpy>=2.1.3", "pyarrow>=16.1.0", "pyspark==3.5.4", "sedonadb", @@ -79,14 +80,19 @@ exclude = ["*.tests", "*.tests.*", "tests", "tests.*"] name = "sedona.spark.utils.geomserde_speedup" sources = [ "src/geomserde_speedup_module.c", + "src/sedonaserde_vectorized_udf_module.c", "src/geomserde.c", "src/geom_buf.c", "src/geos_c_dyn.c", ] -[tool.uv] -dev-dependencies = [ - "pytest>=9.0.2", +[[tool.setuptools.ext-modules]] +name = "sedona.spark.utils.sedonaserde_vectorized_udf_module" +sources = [ + "src/sedonaserde_vectorized_udf_module.c", + "src/geomserde.c", + "src/geom_buf.c", + "src/geos_c_dyn.c", ] [tool.uv.sources] diff --git a/python/sedona/spark/sql/functions.py b/python/sedona/spark/sql/functions.py index 7c480e1700..232ccb50a3 100644 --- a/python/sedona/spark/sql/functions.py +++ b/python/sedona/spark/sql/functions.py @@ -28,8 +28,9 @@ import pyarrow as pa import geoarrow.pyarrow as ga from sedonadb import udf as sedona_udf_module from sedona.spark.sql.types import GeometryType -from pyspark.sql.types import DataType, FloatType, DoubleType, IntegerType, StringType +from pyspark.sql.types import DataType, FloatType, DoubleType, IntegerType, StringType, ByteType +from sedona.spark.utils.udf import has_sedona_serializer_speedup SEDONA_SCALAR_EVAL_TYPE = 5200 SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF" @@ -51,7 +52,7 @@ sedona_udf_to_eval_type = { def sedona_vectorized_udf( - return_type: DataType, udf_type: SedonaUDFType = SedonaUDFType.SHAPELY_SCALAR + return_type: DataType, udf_type: SedonaUDFType = SedonaUDFType.SHAPELY_SCALAR ): import geopandas as gpd @@ -92,7 +93,7 @@ def sedona_vectorized_udf( def _apply_shapely_series_udf( - fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool + fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool ): def apply(series: pd.Series) -> pd.Series: applied = series.apply( @@ -113,7 +114,7 @@ def _apply_shapely_series_udf( def _apply_geo_series_udf( - fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool + fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool ): import geopandas as gpd @@ -161,6 +162,7 @@ def infer_pa_type(spark_type: DataType): else: raise NotImplementedError(f"Type {spark_type} is not supported yet.") + def infer_input_type(spark_type: DataType): if isinstance(spark_type, GeometryType): return sedona_udf_module.GEOMETRY @@ -168,9 +170,12 @@ def infer_input_type(spark_type: DataType): return sedona_udf_module.NUMERIC elif isinstance(spark_type, StringType): return sedona_udf_module.STRING + elif isinstance(spark_type, ByteType): + return sedona_udf_module.BINARY else: raise NotImplementedError(f"Type {spark_type} is not supported yet.") + def infer_input_types(spark_types: list[DataType]): pa_types = [] for spark_type in spark_types: @@ -182,8 +187,12 @@ def infer_input_types(spark_types: list[DataType]): def sedona_db_vectorized_udf( return_type: DataType, - input_types: list[DataType] + input_types: list[DataType], ): + eval_type = 6201 + if has_sedona_serializer_speedup(): + eval_type = 6200 + def apply_fn(fn): out_type = infer_pa_type(return_type) input_types_sedona_db = infer_input_types(input_types) @@ -193,10 +202,9 @@ def sedona_db_vectorized_udf( return fn(*args, **kwargs) udf = UserDefinedFunction( - lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", evalType=6200 + lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", evalType=eval_type ) return udf - return apply_fn diff --git a/python/sedona/spark/utils/geometry_serde.py b/python/sedona/spark/utils/geometry_serde.py index 103eb49817..0ef3d4ed5c 100644 --- a/python/sedona/spark/utils/geometry_serde.py +++ b/python/sedona/spark/utils/geometry_serde.py @@ -25,9 +25,6 @@ from shapely.geometry.base import BaseGeometry speedup_enabled = False - -# Use geomserde_speedup when available, otherwise fallback to general pure -# python implementation. try: from . import geomserde_speedup @@ -60,8 +57,9 @@ try: def deserialize(buf: bytearray) -> Optional[BaseGeometry]: if buf is None: return None - return geomserde_speedup.deserialize(buf) + return geomserde_speedup.deserialize_2(buf) + # Export the from_sedona_func for use with numpy ufuncs speedup_enabled = True elif shapely.__version__.startswith("1."): diff --git a/python/sedona/spark/utils/udf.py b/python/sedona/spark/utils/udf.py new file mode 100644 index 0000000000..01a38a675a --- /dev/null +++ b/python/sedona/spark/utils/udf.py @@ -0,0 +1,26 @@ +import shapely + + +def has_sedona_serializer_speedup(): + try: + from . import geomserde_speedup + except ImportError: + return False + return True + +def to_sedona_func(arr): + try: + from . import geomserde_speedup + except ImportError: + return shapely.to_wkb(arr) + + return geomserde_speedup.to_sedona_func(arr) + + +def from_sedona_func(arr): + try: + from . import geomserde_speedup + except ImportError: + return shapely.from_wkb(arr) + + return geomserde_speedup.from_sedona_func(arr) diff --git a/python/sedona/spark/worker/daemon.py b/python/sedona/spark/worker/daemon.py new file mode 100644 index 0000000000..0d64a543c5 --- /dev/null +++ b/python/sedona/spark/worker/daemon.py @@ -0,0 +1,227 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import numbers +import os +import signal +import select +import socket +import sys +import traceback +import time +import gc +from errno import EINTR, EAGAIN +from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN +from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT + +from sedona.spark.worker.worker import main as worker_main +from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer + + +def compute_real_exit_code(exit_code): + # SystemExit's code can be integer or string, but os._exit only accepts integers + if isinstance(exit_code, numbers.Integral): + return exit_code + else: + return 1 + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +file_handler = logging.FileHandler("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/logs/worker_daemon_main.log", delay=False) +file_handler.flush = file_handler.stream.flush + +logger.addHandler(file_handler) + +def worker(sock, authenticated): + logger.info("Starting worker process with pid =" + str(os.getpid()) + " socket " + str(sock)) + """ + Called by a worker process after the fork(). + """ + signal.signal(SIGHUP, SIG_DFL) + signal.signal(SIGCHLD, SIG_DFL) + signal.signal(SIGTERM, SIG_DFL) + # restore the handler for SIGINT, + # it's useful for debugging (show the stacktrace before exit) + signal.signal(SIGINT, signal.default_int_handler) + + # Read the socket using fdopen instead of socket.makefile() because the latter + # seems to be very slow; note that we need to dup() the file descriptor because + # otherwise writes also cause a seek that makes us miss data on the read side. + buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536)) + infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size) + outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size) + + if not authenticated: + client_secret = UTF8Deserializer().loads(infile) + if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret: + write_with_length("ok".encode("utf-8"), outfile) + outfile.flush() + else: + write_with_length("err".encode("utf-8"), outfile) + outfile.flush() + sock.close() + return 1 + + exit_code = 0 + try: + worker_main(infile, outfile) + except SystemExit as exc: + exit_code = compute_real_exit_code(exc.code) + finally: + try: + outfile.flush() + except Exception: + pass + return exit_code + + +def manager(): + # Create a new process group to corral our children + os.setpgid(0, 0) + + # Create a listening socket on the loopback interface + if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true": + listen_sock = socket.socket(AF_INET6, SOCK_STREAM) + listen_sock.bind(("::1", 0, 0, 0)) + listen_sock.listen(max(1024, SOMAXCONN)) + listen_host, listen_port, _, _ = listen_sock.getsockname() + else: + listen_sock = socket.socket(AF_INET, SOCK_STREAM) + listen_sock.bind(("127.0.0.1", 0)) + listen_sock.listen(max(1024, SOMAXCONN)) + listen_host, listen_port = listen_sock.getsockname() + + # re-open stdin/stdout in 'wb' mode + stdin_bin = os.fdopen(sys.stdin.fileno(), "rb", 4) + stdout_bin = os.fdopen(sys.stdout.fileno(), "wb", 4) + write_int(listen_port, stdout_bin) + stdout_bin.flush() + + def shutdown(code): + signal.signal(SIGTERM, SIG_DFL) + # Send SIGHUP to notify workers of shutdown + os.kill(0, SIGHUP) + sys.exit(code) + + def handle_sigterm(*args): + shutdown(1) + + signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM + signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP + signal.signal(SIGCHLD, SIG_IGN) + + reuse = os.environ.get("SPARK_REUSE_WORKER") + + # Initialization complete + try: + while True: + try: + ready_fds = select.select([0, listen_sock], [], [], 1)[0] + except select.error as ex: + if ex[0] == EINTR: + continue + else: + raise + + if 0 in ready_fds: + try: + worker_pid = read_int(stdin_bin) + except EOFError: + # Spark told us to exit by closing stdin + shutdown(0) + try: + os.kill(worker_pid, signal.SIGKILL) + except OSError: + pass # process already died + + if listen_sock in ready_fds: + try: + sock, _ = listen_sock.accept() + except OSError as e: + if e.errno == EINTR: + continue + raise + + # Launch a worker process + try: + pid = os.fork() + except OSError as e: + if e.errno in (EAGAIN, EINTR): + time.sleep(1) + pid = os.fork() # error here will shutdown daemon + else: + outfile = sock.makefile(mode="wb") + write_int(e.errno, outfile) # Signal that the fork failed + outfile.flush() + outfile.close() + sock.close() + continue + + if pid == 0: + # in child process + listen_sock.close() + + # It should close the standard input in the child process so that + # Python native function executions stay intact. + # + # Note that if we just close the standard input (file descriptor 0), + # the lowest file descriptor (file descriptor 0) will be allocated, + # later when other file descriptors should happen to open. + # + # Therefore, here we redirects it to '/dev/null' by duplicating + # another file descriptor for '/dev/null' to the standard input (0). + # See SPARK-26175. + devnull = open(os.devnull, "r") + os.dup2(devnull.fileno(), 0) + devnull.close() + + try: + # Acknowledge that the fork was successful + outfile = sock.makefile(mode="wb") + write_int(os.getpid(), outfile) + outfile.flush() + outfile.close() + authenticated = False + while True: + code = worker(sock, authenticated) + logger.info("Worker exited with code %d", code) + if code == 0: + authenticated = True + if not reuse or code: + # wait for closing + try: + while sock.recv(1024): + pass + except Exception: + pass + break + gc.collect() + except BaseException: + traceback.print_exc() + os._exit(1) + else: + os._exit(0) + else: + sock.close() + + finally: + shutdown(1) + + +if __name__ == "__main__": + manager() diff --git a/python/sedona/spark/worker/serde.py b/python/sedona/spark/worker/serde.py index 3954d075b7..5a33a26610 100644 --- a/python/sedona/spark/worker/serde.py +++ b/python/sedona/spark/worker/serde.py @@ -4,10 +4,11 @@ from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer from sedona.spark.worker.udf_info import UDFInfo class SedonaDBSerializer(ArrowStreamPandasSerializer): - def __init__(self, timezone, safecheck, db, udf_info: UDFInfo): + def __init__(self, timezone, safecheck, db, udf_info: UDFInfo, cast_to_wkb=False): super(SedonaDBSerializer, self).__init__(timezone, safecheck) self.db = db self.udf_info = udf_info + self.cast_to_wkb = cast_to_wkb def load_stream(self, stream): import pyarrow as pa @@ -22,7 +23,7 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer): df.to_view(table_name) - sql_expression = self.udf_info.sedona_db_transformation_expr(table_name) + sql_expression = self.udf_info.sedona_db_transformation_expr(table_name, self.cast_to_wkb) index += 1 @@ -37,7 +38,6 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer): if writer is None: writer = pa.RecordBatchStreamWriter(stream, batch.schema) writer.write_batch(batch) - # stream.flush() finally: if writer is not None: writer.close() diff --git a/python/sedona/spark/worker/udf_info.py b/python/sedona/spark/worker/udf_info.py index d354bcea7e..7853133e77 100644 --- a/python/sedona/spark/worker/udf_info.py +++ b/python/sedona/spark/worker/udf_info.py @@ -11,24 +11,23 @@ class UDFInfo: return_type: object name: str - def get_function_call_sql(self, table_name: str) -> str: + def get_function_call_sql(self, table_name: str, cast_to_wkb: bool = False) -> str: arg_offset_str = ", ".join([f"_{el}" for el in self.arg_offsets]) function_expr = f"{self.name}({arg_offset_str})" - if isinstance(self.return_type, GeometryType): + if isinstance(self.return_type, GeometryType) and cast_to_wkb: return f"SELECT ST_GeomToSedonaSpark({function_expr}) AS _0 FROM {table_name}" return f"SELECT {function_expr} AS _0 FROM {table_name}" - def sedona_db_transformation_expr(self, table_name: str) -> str: + def sedona_db_transformation_expr(self, table_name: str, cast_to_wkb: bool = False) -> str: fields = [] for arg in self.arg_offsets: - if arg in self.geom_offsets: + if arg in self.geom_offsets and cast_to_wkb: crs = self.geom_offsets[arg] fields.append(f"ST_GeomFromSedonaSpark(_{arg}, 'EPSG:{crs}') AS _{arg}") continue fields.append(f"_{arg}") - fields_expr = ", ".join(fields) return f"SELECT {fields_expr} FROM {table_name}" diff --git a/python/sedona/spark/worker/worker.py b/python/sedona/spark/worker/worker.py index 6b2a18c8f2..17dae02e63 100644 --- a/python/sedona/spark/worker/worker.py +++ b/python/sedona/spark/worker/worker.py @@ -15,16 +15,17 @@ from sedona.spark.worker.serde import SedonaDBSerializer from sedona.spark.worker.udf_info import UDFInfo -def apply_iterator(db, iterator, udf_info: UDFInfo): +def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False): i = 0 for df in iterator: i+=1 table_name = f"output_table_{i}" df.to_view(table_name) - function_call_sql = udf_info.get_function_call_sql(table_name) + function_call_sql = udf_info.get_function_call_sql(table_name, cast_to_wkb=cast_to_wkb) df_out = db.sql(function_call_sql) + df_out.to_view(f"view_{i}") at = df_out.to_arrow_table() batches = at.combine_chunks().to_batches() @@ -207,9 +208,9 @@ def main(infile, outfile): pickle_ser = CPickleSerializer() split_index = read_int(infile) - # + check_python_version(utf8_deserializer, infile) - # + check_barrier_flag(infile) task_context = assign_task_context(utf_serde=utf8_deserializer, infile=infile) @@ -217,7 +218,7 @@ def main(infile, outfile): shuffle.DiskBytesSpilled = 0 resolve_python_path(utf8_deserializer, infile) - # + check_broadcast_variables(infile) eval_type = read_int(infile) @@ -229,11 +230,14 @@ def main(infile, outfile): sedona_db.register_udf(udf.function) init_time = time.time() + cast_to_wkb = read_bool(infile) + serde = SedonaDBSerializer( timezone=runner_conf.get("spark.sql.session.timeZone", "UTC"), safecheck=False, db=sedona_db, - udf_info=udf + udf_info=udf, + cast_to_wkb=cast_to_wkb ) number_of_geometries = read_int(infile) @@ -247,7 +251,7 @@ def main(infile, outfile): udf.geom_offsets = geom_offsets iterator = serde.load_stream(infile) - out_iterator = apply_iterator(db=sedona_db, iterator=iterator, udf_info=udf) + out_iterator = apply_iterator(db=sedona_db, iterator=iterator, udf_info=udf, cast_to_wkb=cast_to_wkb) serde.dump_stream(out_iterator, outfile) diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 0000000000..66ab74701b --- /dev/null +++ b/python/setup.py @@ -0,0 +1,6 @@ +from setuptools import setup +import numpy + +setup( + include_dirs=[numpy.get_include()], +) diff --git a/python/src/geom_buf.c b/python/src/geom_buf.c index 5239de5ae0..d6a51bb3d0 100644 --- a/python/src/geom_buf.c +++ b/python/src/geom_buf.c @@ -208,6 +208,8 @@ SedonaErrorCode geom_buf_alloc(GeomBuffer *geom_buf, return SEDONA_SUCCESS; } +#include <stdio.h> + SedonaErrorCode read_geom_buf_header(const char *buf, int buf_size, GeomBuffer *geom_buf, CoordinateSequenceInfo *cs_info, diff --git a/python/src/geomserde.c b/python/src/geomserde.c index c1f7427738..81dafe216f 100644 --- a/python/src/geomserde.c +++ b/python/src/geomserde.c @@ -718,6 +718,7 @@ static SedonaErrorCode deserialize_geom_buf(GEOSContextHandle_t handle, return SEDONA_SUCCESS; } +#include <stdio.h> SedonaErrorCode sedona_deserialize_geom(GEOSContextHandle_t handle, const char *buf, int buf_size, GEOSGeometry **p_geom, diff --git a/python/src/geomserde_speedup_module.c b/python/src/geomserde_speedup_module.c index a95ced29e5..621f956cd0 100644 --- a/python/src/geomserde_speedup_module.c +++ b/python/src/geomserde_speedup_module.c @@ -20,10 +20,15 @@ #define PY_SSIZE_T_CLEAN #include <Python.h> #include <stdio.h> +// +//#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION #include "geomserde.h" #include "geos_c_dyn.h" #include "pygeos/c_api.h" +#include <numpy/ndarraytypes.h> +#include <numpy/npy_3kcompat.h> +#include <numpy/ufuncobject.h> PyDoc_STRVAR(module_doc, "Geometry serialization/deserialization module."); @@ -225,7 +230,7 @@ static PyObject *serialize(PyObject *self, PyObject *args) { return do_serialize(geos_geom); } -static PyObject *deserialize(PyObject *self, PyObject *args) { +static PyObject *deserialize_2(PyObject *self, PyObject *args) { GEOSContextHandle_t handle = NULL; int length = 0; GEOSGeometry *geom = do_deserialize(args, &handle, &length); @@ -262,16 +267,106 @@ static PyObject *deserialize_1(PyObject *self, PyObject *args) { return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length); } +static PyObject *to_sedona_func(PyObject *self, PyObject *args) { + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)){ + return NULL; + }; + + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); + + GEOSContextHandle_t handle = get_geos_context_handle(); + if (handle == NULL) { + return NULL; + } + + npy_intp n = PyArray_SIZE(input_obj); + npy_intp dims[1] = {n}; + PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); + for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { + PyObject *obj = objs[i]; + GEOSGeometry *geos_geom = NULL; + char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom); + + PyObject *serialized = do_serialize(geos_geom); + PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized); + } + + return out; +} /* Module definition for Shapely 2.x */ +static PyObject *from_sedona_func(PyObject *self, PyObject *args) { + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)){ + return NULL; + }; + + GEOSContextHandle_t handle = get_geos_context_handle(); + + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); + + int p_bytes_read = 0; + + npy_intp n = PyArray_SIZE(input_obj); + + npy_intp dims[1] = {n}; + PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); + + for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { + PyObject *obj = objs[i]; + if (!PyBytes_Check(obj)) { + PyErr_SetString(PyExc_TypeError, "Expected bytes"); + return NULL; + } + + char *buf = PyBytes_AS_STRING(obj); + + Py_ssize_t len = PyBytes_GET_SIZE(obj); + + GEOSGeometry *geom = NULL; + + SedonaErrorCode err = sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read); + if (err != SEDONA_SUCCESS) { + handle_geomserde_error(err); + return NULL; + } + PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle); + + PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom); + } + + return out; +} + static PyMethodDef geomserde_methods_shapely_2[] = { {"load_libgeos_c", load_libgeos_c, METH_VARARGS, "Load libgeos_c."}, {"serialize", serialize, METH_VARARGS, "Serialize geometry object as bytearray."}, - {"deserialize", deserialize, METH_VARARGS, + {"deserialize_2", deserialize_2, METH_VARARGS, + "Deserialize bytes-like object to geometry object."}, + {"from_sedona_func", from_sedona_func, METH_VARARGS, + "Deserialize bytes-like object to geometry object."}, + {"to_sedona_func", to_sedona_func, METH_VARARGS, "Deserialize bytes-like object to geometry object."}, {NULL, NULL, 0, NULL}, /* Sentinel */ }; +// +//static int add_from_sedona_func_to_module(PyObject *m) { +// PyObject *capsule = PyCapsule_New((void *)from_sedona_func, "from_sedona_func", NULL); +// if (capsule == NULL) { +// return -1; +// } +// if (PyModule_AddObject(m, "from_sedona_func", capsule) < 0) { +// Py_DECREF(capsule); +// return -1; +// } +// return 0; +//} static struct PyModuleDef geomserde_module_shapely_2 = { PyModuleDef_HEAD_INIT, "geomserde_speedup", module_doc, 0, diff --git a/python/tests/test_base.py b/python/tests/test_base.py index 911860e416..e240a09758 100644 --- a/python/tests/test_base.py +++ b/python/tests/test_base.py @@ -72,6 +72,8 @@ class TestBase: ) .config("spark.executor.memory", "10G") \ .config("spark.driver.memory", "10G") \ + .config("sedona.python.worker.udf.daemon.module", "sedona.spark.worker.daemon") \ + .config("sedona.python.worker.daemon.enabled", "false") \ # Pandas on PySpark doesn't work with ANSI mode, which is enabled by default # in Spark 4 .config("spark.sql.ansi.enabled", "false") diff --git a/python/tests/utils/test_sedona_db_vectorized_udf.py b/python/tests/utils/test_sedona_db_vectorized_udf.py index 904d59a282..4b266384fa 100644 --- a/python/tests/utils/test_sedona_db_vectorized_udf.py +++ b/python/tests/utils/test_sedona_db_vectorized_udf.py @@ -1,12 +1,106 @@ +import time + +import numpy as np + from sedona.spark.sql.functions import sedona_db_vectorized_udf +from sedona.spark.utils.udf import to_sedona_func, from_sedona_func from tests.test_base import TestBase import pyarrow as pa import shapely from sedona.sql import GeometryType from pyspark.sql.functions import expr, lit -from pyspark.sql.types import DoubleType, IntegerType +from pyspark.sql.types import DoubleType, IntegerType, ByteType from sedona.spark.sql import ST_X - +from shapely._enum import ParamEnum + +def test_m(): + on_invalid="raise" + wkb = b'\x12\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?' + geometry = np.asarray([wkb, wkb], dtype=object) + + DecodingErrorOptions = ParamEnum( + "DecodingErrorOptions", {"ignore": 0, "warn": 1, "raise": 2, "fix": 3} + ) + + # print("sss") + + + # <class 'numpy.ndarray'> + # object + # C_CONTIGUOUS : True + # F_CONTIGUOUS : True + # OWNDATA : False + # WRITEABLE : True + # ALIGNED : True + # WRITEBACKIFCOPY : False + # print(type(geometry)) + # print(geometry.dtype) + # print(geometry.flags) + + result = from_sedona_func(geometry) + + result2 = to_sedona_func(result) + +# ensure the input has object dtype, to avoid numpy inferring it as a +# fixed-length string dtype (which removes trailing null bytes upon access +# of array elements) + # + # def from_sedona_func(arr): + # try: + # from . import sedonaserde_vectorized_udf_module + # print(sedonaserde_vectorized_udf_module.from_sedona_func_3(arr)) + # except Exception as e: + # print("Cannot import sedonaserde_vectorized_udf_module:") + # print(e) + # # print() + # return None +# +# def from_wkb(geometry, on_invalid="raise", **kwargs): +# r"""Create geometries from the Well-Known Binary (WKB) representation. +# +# The Well-Known Binary format is defined in the `OGC Simple Features +# Specification for SQL <https://www.opengeospatial.org/standards/sfs>`__. +# +# Parameters +# ---------- +# geometry : str or array_like +# The WKB byte object(s) to convert. +# on_invalid : {"raise", "warn", "ignore", "fix"}, default "raise" +# Indicates what to do when an invalid WKB is encountered. Note that the +# validations involved are very basic, e.g. the minimum number of points +# for the geometry type. For a thorough check, use :func:`is_valid` after +# conversion to geometries. Valid options are: +# +# - raise: an exception will be raised if any input geometry is invalid. +# - warn: a warning will be raised and invalid WKT geometries will be +# returned as ``None``. +# - ignore: invalid geometries will be returned as ``None`` without a +# warning. +# - fix: an effort is made to fix invalid input geometries (currently just +# unclosed rings). If this is not possible, they are returned as +# ``None`` without a warning. Requires GEOS >= 3.11. +# +# .. versionadded:: 2.1.0 +# **kwargs +# See :ref:`NumPy ufunc docs <ufuncs.kwargs>` for other keyword arguments. +# +# Examples +# -------- +# >>> import shapely +# >>> shapely.from_wkb(b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?') +# <POINT (1 1)> +# +# """ # noqa: E501 +# if not np.isscalar(on_invalid): +# raise TypeError("on_invalid only accepts scalar values") +# +# invalid_handler = np.uint8(DecodingErrorOptions.get_value(on_invalid)) +# +# # ensure the input has object dtype, to avoid numpy inferring it as a +# # fixed-length string dtype (which removes trailing null bytes upon access +# # of array elements) +# geometry = np.asarray(geometry, dtype=object) +# return lib.from_wkb(geometry, invalid_handler, **kwargs) class TestSedonaDBArrowFunction(TestBase): def test_vectorized_udf(self): @@ -15,7 +109,6 @@ class TestSedonaDBArrowFunction(TestBase): geom_wkb = pa.array(geom.storage.to_array()) distance = pa.array(distance.to_array()) geom = shapely.from_wkb(geom_wkb) - result_shapely = shapely.centroid(geom) return pa.array(shapely.to_wkb(result_shapely)) @@ -95,18 +188,26 @@ class TestSedonaDBArrowFunction(TestBase): assert crs_list == [3857, 3857, 3857] def test_geometry_to_geometry(self): - @sedona_db_vectorized_udf(return_type=GeometryType(), input_types=[GeometryType()]) + @sedona_db_vectorized_udf(return_type=GeometryType(), input_types=[ByteType()]) def buffer_geometry(geom): geom_wkb = pa.array(geom.storage.to_array()) - geom = shapely.from_wkb(geom_wkb) + geometry_array = np.asarray(geom_wkb, dtype=object) + geom = from_sedona_func(geometry_array) result_shapely = shapely.buffer(geom, 10) - return pa.array(shapely.to_wkb(result_shapely)) + return pa.array(to_sedona_func(result_shapely)) df = self.spark.read.\ format("geoparquet").\ - load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_large_3") + load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l1") + + # 1 045 770 + # print(df.count()) + + # df.unionAll(df).unionAll(df).unionAll(df).unionAll(df).unionAll(df).\ + # unionAll(df).unionAll(df).unionAll(df).unionAll(df).unionAll(df).\ + # write.format("geoparquet").mode("overwrite").save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l2") # 18 24 # df.union(df).union(df).union(df).union(df).union(df).union(df).\ # write.format("geoparquet").mode("overwrite").save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_large_3") @@ -117,6 +218,12 @@ class TestSedonaDBArrowFunction(TestBase): values.show() + # for _ in range(4): + # start_time = time.time() + # values.show() + # end_time = time.time() + # print(f"Execution time: {end_time - start_time} seconds") + def test_geometry_to_geometry_normal_udf(self): from pyspark.sql.functions import udf @@ -127,10 +234,19 @@ class TestSedonaDBArrowFunction(TestBase): df = self.spark.read. \ format("geoparquet"). \ - load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_large_3") + load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l2") + # print(df.count()) + # df.limit(10).collect() values = df.select(create_buffer_udf(df.geometry).alias("geometry")). \ selectExpr("ST_Area(geometry) as area"). \ selectExpr("Sum(area) as total_area") values.show() + + # for _ in range(4): + # start_time = time.time() + # values.show() + # end_time = time.time() + # print(f"Execution time: {end_time - start_time} seconds") +# 1 045 770 diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala index 11263dd7f6..0f1a5fe0a0 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala @@ -25,6 +25,7 @@ object PythonEvalType { // sedona db eval types val SQL_SCALAR_SEDONA_DB_UDF = 6200 + val SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF = 6201 val SEDONA_DB_UDF_TYPE_CONSTANT = 6000 def toString(pythonEvalType: Int): String = pythonEvalType match { @@ -32,5 +33,6 @@ object PythonEvalType { case SQL_SCALAR_SEDONA_DB_UDF => "SQL_SCALAR_SEDONA_DB_UDF" } - def evals(): Set[Int] = Set(SQL_SCALAR_SEDONA_UDF, SQL_SCALAR_SEDONA_DB_UDF) + def evals(): Set[Int] = + Set(SQL_SCALAR_SEDONA_UDF, SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF) } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala index 0d3960d2d8..3055e768b9 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala @@ -38,13 +38,15 @@ class SedonaArrowPythonRunner( protected override val workerConf: Map[String, String], val pythonMetrics: Map[String, SQLMetric], jobArtifactUUID: Option[String], - geometryFields: Seq[(Int, Int)]) + geometryFields: Seq[(Int, Int)], + castGeometryToWKB: Boolean = false) extends SedonaBasePythonRunner[Iterator[InternalRow], ColumnarBatch]( funcs, evalType, argOffsets, jobArtifactUUID, - geometryFields) + geometryFields, + castGeometryToWKB) with SedonaBasicPythonArrowInput with SedonaBasicPythonArrowOutput { diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala index bb897931b6..228ddc2cbc 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.python import org.apache.sedona.sql.UDF.PythonEvalType -import org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF} +import org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF, SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF} import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow @@ -115,10 +115,10 @@ case class SedonaArrowEvalPythonExec( val batchIter = if (batchSize > 0) new BatchIterator(full, batchSize) else Iterator(full) evalType match { - case SQL_SCALAR_SEDONA_DB_UDF => + case SQL_SCALAR_SEDONA_DB_UDF | SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF => val columnarBatchIter = new SedonaArrowPythonRunner( funcs, - evalType - PythonEvalType.SEDONA_DB_UDF_TYPE_CONSTANT, + 200, argOffsets, schema, sessionLocalTimeZone, @@ -126,7 +126,9 @@ case class SedonaArrowEvalPythonExec( pythonRunnerConf, pythonMetrics, jobArtifactUUID, - geometryFields).compute(batchIter, context.partitionId(), context) + geometryFields, + evalType == SQL_SCALAR_SEDONA_DB_UDF) + .compute(batchIter, context.partitionId(), context) val result = columnarBatchIter.flatMap { batch => batch.rowIterator.asScala diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala index 276383a0ee..055d5db15f 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala @@ -39,14 +39,16 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( evalType: Int, argOffsets: Array[Array[Int]], jobArtifactUUID: Option[String], - val geometryFields: Seq[(Int, Int)] = Seq.empty) + val geometryFields: Seq[(Int, Int)] = Seq.empty, + val castGeometryToWKB: Boolean = false) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, jobArtifactUUID) with Logging { require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") private val conf = SparkEnv.get.conf - private val reuseWorker = conf.getBoolean(PYTHON_WORKER_REUSE.key, PYTHON_WORKER_REUSE.defaultValue.get) + private val reuseWorker = + conf.getBoolean(PYTHON_WORKER_REUSE.key, PYTHON_WORKER_REUSE.defaultValue.get) private val faultHandlerEnabled = conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED) private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = { @@ -81,9 +83,12 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( envVars.put("PYTHON_FAULTHANDLER_DIR", SedonaBasePythonRunner.faultHandlerLogDir.toString) } + if (reuseWorker) { + envVars.put("SPARK_REUSE_WORKER", "1") + } + envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) - println("running the compute for SedonaBasePythonRunner and partition index: " + partitionIndex) val (worker: Socket, pid: Option[Int]) = { WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap) } @@ -98,7 +103,6 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( if (!reuseWorker || releasedOrClosed.compareAndSet(false, true)) { try { - logInfo("Shutting down worker socket") worker.close() } catch { case e: Exception => diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala index 93bcaee0c6..459388856b 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala @@ -34,7 +34,6 @@ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.sql.execution.python.SedonaPythonWorkerFactory.PROCESS_WAIT_TIMEOUT_MS import org.apache.spark.util.RedirectThread -import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) extends Logging { @@ -181,7 +180,6 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) ex } private def stopDaemon(): Unit = { - logError("daemon stopping called") self.synchronized { if (useDaemon) { cleanupIdleWorkers() @@ -194,7 +192,6 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) ex daemon = null daemonPort = 0 } else { - println("Stopping simple workers") simpleWorkers.mapValues(_.destroy()) } } @@ -233,11 +230,11 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) ex daemonPort = in.readInt() } catch { case _: EOFException if daemon.isAlive => - throw SparkCoreErrors.eofExceptionWhileReadPortNumberError( - sedonaDaemonModule) + throw SparkCoreErrors.eofExceptionWhileReadPortNumberError(sedonaDaemonModule) case _: EOFException => - throw SparkCoreErrors. - eofExceptionWhileReadPortNumberError(sedonaDaemonModule, Some(daemon.exitValue)) + throw SparkCoreErrors.eofExceptionWhileReadPortNumberError( + sedonaDaemonModule, + Some(daemon.exitValue)) } // test that the returned port number is within a valid range. @@ -261,7 +258,6 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) ex redirectStreamsToStderr(in, daemon.getErrorStream) } catch { case e: Exception => - // If the daemon exists, wait for it to finish and get its stderr val stderr = Option(daemon) .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) } @@ -307,7 +303,6 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) ex def releaseWorker(worker: Socket): Unit = { if (useDaemon) { - logInfo("Releasing worker back to daemon pool") self.synchronized { lastActivityNs = System.nanoTime() idleWorkers.enqueue(worker) @@ -345,5 +340,4 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) ex private object SedonaPythonWorkerFactory { val PROCESS_WAIT_TIMEOUT_MS = 10000 - val IDLE_WORKER_TIMEOUT_NS = TimeUnit.MINUTES.toNanos(1) // kill idle workers after 1 minute } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala index 18db42ae0d..2544e63a97 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala @@ -60,6 +60,9 @@ private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] { handleMetadataBeforeExec(dataOut) writeUDF(dataOut, funcs, argOffsets) + // if speedup is not available and we need to use casting + dataOut.writeBoolean(self.castGeometryToWKB) + // write dataOut.writeInt(self.geometryFields.length) // write geometry field indices and their SRIDs diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala index 0c0b220933..27764c2a54 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala @@ -34,7 +34,8 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnVector, Columna private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[_, OUT] => - private val reuseWorker = SparkEnv.get.conf.getBoolean(PYTHON_WORKER_REUSE.key, PYTHON_WORKER_REUSE.defaultValue.get) + private val reuseWorker = + SparkEnv.get.conf.getBoolean(PYTHON_WORKER_REUSE.key, PYTHON_WORKER_REUSE.defaultValue.get) protected def pythonMetrics: Map[String, SQLMetric] diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala index 82fe6dedda..6411bec97e 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala @@ -24,24 +24,28 @@ import scala.collection.mutable object WorkerContext { def createPythonWorker( - pythonExec: String, - envVars: Map[String, String]): (java.net.Socket, Option[Int]) = { + pythonExec: String, + envVars: Map[String, String]): (java.net.Socket, Option[Int]) = { synchronized { val key = (pythonExec, envVars) pythonWorkers.getOrElseUpdate(key, new SedonaDBWorkerFactory(pythonExec, envVars)).create() } } - def destroyPythonWorker(pythonExec: String, - envVars: Map[String, String], worker: Socket): Unit = { + def destroyPythonWorker( + pythonExec: String, + envVars: Map[String, String], + worker: Socket): Unit = { synchronized { val key = (pythonExec, envVars) pythonWorkers.get(key).foreach(_.stopWorker(worker)) } } - def releasePythonWorker(pythonExec: String, - envVars: Map[String, String], worker: Socket): Unit = { + def releasePythonWorker( + pythonExec: String, + envVars: Map[String, String], + worker: Socket): Unit = { synchronized { val key = (pythonExec, envVars) pythonWorkers.get(key).foreach(_.releaseWorker(worker)) diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala index ebb5a568e1..3584cb01bd 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala @@ -44,7 +44,8 @@ class ExtractSedonaUDFRule extends Rule[LogicalPlan] with Logging { } def isScalarPythonUDF(e: Expression): Boolean = { - e.isInstanceOf[PythonUDF] && PythonEvalType.evals.contains(e.asInstanceOf[PythonUDF].evalType) + e.isInstanceOf[PythonUDF] && + PythonEvalType.evals.contains(e.asInstanceOf[PythonUDF].evalType) } private def collectEvaluableUDFsFromExpressions( diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala index e64e9dec3b..c9b4d6ac28 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala @@ -46,9 +46,9 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll { // We need to be explicit about broadcasting in tests. .config("sedona.join.autoBroadcastJoinThreshold", "-1") .config("spark.sql.extensions", "org.apache.sedona.sql.SedonaSqlExtensions") - .config("sedona.python.worker.udf.module", "sedonaworker.worker") + .config("sedona.python.worker.udf.module", "sedona.spark.worker.worker") .config("sedona.python.worker.udf.daemon.module", "sedonaworker.daemon") - .config("sedona.python.worker.daemon.enabled", "true") + .config("sedona.python.worker.daemon.enabled", "false") // .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean()) .getOrCreate() diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala index 000c1f55b6..4fe4acfb12 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala @@ -35,7 +35,6 @@ class StrategySuite extends TestBaseScala with Matchers { import spark.implicits._ - it("sedona geospatial UDF - geopandas") { val df = Seq( (1, "value", wktReader.read("POINT(21 52)")), @@ -50,7 +49,8 @@ class StrategySuite extends TestBaseScala with Matchers { geopandasUDFDF.count shouldEqual 5 - geopandasUDFDF.selectExpr("ST_AsText(ST_ReducePrecision(geom_buffer, 2))") + geopandasUDFDF + .selectExpr("ST_AsText(ST_ReducePrecision(geom_buffer, 2))") .as[String] .collect() should contain theSameElementsAs Seq( "POLYGON ((20 51, 20 53, 22 53, 22 51, 20 51))", @@ -61,36 +61,33 @@ class StrategySuite extends TestBaseScala with Matchers { } it("sedona geospatial UDF - sedona db") { -// val df = Seq( -// (1, "value", wktReader.read("POINT(21 52)")), -// (2, "value1", wktReader.read("POINT(20 50)")), -// (3, "value2", wktReader.read("POINT(20 49)")), -// (4, "value3", wktReader.read("POINT(20 48)")), -// (5, "value4", wktReader.read("POINT(20 47)"))) -// .toDF("id", "value", "geom") -// -// val dfVectorized = df -// .withColumn("geometry", expr("ST_SetSRID(geom, '4326')")) -// .select(sedonaDBGeometryToGeometryFunction(col("geometry"), lit(100)).alias("geom")) - -// dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS x") -// .selectExpr("sum(x)") -// .as[Double] -// .collect().head shouldEqual 101 - - val dfCopied = sparkSession.read - .format("geoparquet") - .load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona/barcelona.geoparquet") + val df = Seq( + (1, "value", wktReader.read("POINT(21 52)")), + (2, "value1", wktReader.read("POINT(20 50)")), + (3, "value2", wktReader.read("POINT(20 49)")), + (4, "value3", wktReader.read("POINT(20 48)")), + (5, "value4", wktReader.read("POINT(20 47)"))) + .toDF("id", "value", "geom") - val values = dfCopied.unionAll(dfCopied) - .unionAll(dfCopied) -// .unionAll(dfCopied) -// .unionAll(dfCopied) -// .unionAll(dfCopied) - .select(sedonaDBGeometryToGeometryFunction(col("geometry"), lit(10)).alias("geom")) - .selectExpr("ST_Area(geom) as area") - .selectExpr("Sum(area) as total_area") + val dfVectorized = df + .withColumn("geometry", expr("ST_SetSRID(geom, '4326')")) + .select(sedonaDBGeometryToGeometryFunction(col("geometry"), lit(100)).alias("geom")) - values.show() + dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS x") + .selectExpr("sum(x)") + .as[Double] + .collect().head shouldEqual 101 +// +// val dfCopied = sparkSession.read +// .format("geoparquet") +// .load( +// "/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona/barcelona.geoparquet") +// +// val values = dfCopied +// .select(sedonaDBGeometryToGeometryFunction(col("geometry"), lit(10)).alias("geom")) +// .selectExpr("ST_Area(geom) as area") +// .selectExpr("Sum(area) as total_area") +// +// values.show() } }
