This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch add-sedona-worker-daemon-mode in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 42229e5f66bb0cf032348799b358991c77e3e63a Author: pawelkocinski <[email protected]> AuthorDate: Tue Jan 13 23:23:05 2026 +0100 add sedonadb sedona udf worker example --- python/sedona/spark/sql/functions.py | 29 ++- python/sedona/spark/utils/udf.py | 22 ++- python/sedona/spark/worker/__init__.py | 16 ++ python/sedona/spark/worker/daemon.py | 48 +++-- python/sedona/spark/worker/serde.py | 25 ++- python/sedona/spark/worker/udf_info.py | 25 ++- python/sedona/spark/worker/worker.py | 56 ++++-- python/setup.py | 17 ++ python/src/geomserde_speedup_module.c | 138 ++++++------- python/tests/test_base.py | 14 +- .../tests/utils/test_sedona_db_vectorized_udf.py | 218 +++++---------------- .../org/apache/spark/sql/udf/StrategySuite.scala | 18 +- 12 files changed, 311 insertions(+), 315 deletions(-) diff --git a/python/sedona/spark/sql/functions.py b/python/sedona/spark/sql/functions.py index 232ccb50a3..d8bf73c152 100644 --- a/python/sedona/spark/sql/functions.py +++ b/python/sedona/spark/sql/functions.py @@ -28,7 +28,14 @@ import pyarrow as pa import geoarrow.pyarrow as ga from sedonadb import udf as sedona_udf_module from sedona.spark.sql.types import GeometryType -from pyspark.sql.types import DataType, FloatType, DoubleType, IntegerType, StringType, ByteType +from pyspark.sql.types import ( + DataType, + FloatType, + DoubleType, + IntegerType, + StringType, + ByteType, +) from sedona.spark.utils.udf import has_sedona_serializer_speedup @@ -52,7 +59,7 @@ sedona_udf_to_eval_type = { def sedona_vectorized_udf( - return_type: DataType, udf_type: SedonaUDFType = SedonaUDFType.SHAPELY_SCALAR + return_type: DataType, udf_type: SedonaUDFType = SedonaUDFType.SHAPELY_SCALAR ): import geopandas as gpd @@ -93,7 +100,7 @@ def sedona_vectorized_udf( def _apply_shapely_series_udf( - fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool + fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool ): def apply(series: pd.Series) -> pd.Series: applied = series.apply( @@ -114,7 +121,7 @@ def _apply_shapely_series_udf( def _apply_geo_series_udf( - fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool + fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool ): import geopandas as gpd @@ -166,7 +173,11 @@ def infer_pa_type(spark_type: DataType): def infer_input_type(spark_type: DataType): if isinstance(spark_type, GeometryType): return sedona_udf_module.GEOMETRY - elif isinstance(spark_type, FloatType) or isinstance(spark_type, DoubleType) or isinstance(spark_type, IntegerType): + elif ( + isinstance(spark_type, FloatType) + or isinstance(spark_type, DoubleType) + or isinstance(spark_type, IntegerType) + ): return sedona_udf_module.NUMERIC elif isinstance(spark_type, StringType): return sedona_udf_module.STRING @@ -186,12 +197,12 @@ def infer_input_types(spark_types: list[DataType]): def sedona_db_vectorized_udf( - return_type: DataType, - input_types: list[DataType], + return_type: DataType, + input_types: list[DataType], ): - eval_type = 6201 + eval_type = 6200 if has_sedona_serializer_speedup(): - eval_type = 6200 + eval_type = 6201 def apply_fn(fn): out_type = infer_pa_type(return_type) diff --git a/python/sedona/spark/utils/udf.py b/python/sedona/spark/utils/udf.py index 01a38a675a..0f88ef07f2 100644 --- a/python/sedona/spark/utils/udf.py +++ b/python/sedona/spark/utils/udf.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import shapely @@ -8,7 +25,8 @@ def has_sedona_serializer_speedup(): return False return True -def to_sedona_func(arr): + +def to_sedona(arr): try: from . import geomserde_speedup except ImportError: @@ -17,7 +35,7 @@ def to_sedona_func(arr): return geomserde_speedup.to_sedona_func(arr) -def from_sedona_func(arr): +def from_sedona(arr): try: from . import geomserde_speedup except ImportError: diff --git a/python/sedona/spark/worker/__init__.py b/python/sedona/spark/worker/__init__.py index e69de29bb2..13a83393a9 100644 --- a/python/sedona/spark/worker/__init__.py +++ b/python/sedona/spark/worker/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/python/sedona/spark/worker/daemon.py b/python/sedona/spark/worker/daemon.py index 0d64a543c5..ce75e376ea 100644 --- a/python/sedona/spark/worker/daemon.py +++ b/python/sedona/spark/worker/daemon.py @@ -1,19 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import logging import numbers import os @@ -39,16 +40,23 @@ def compute_real_exit_code(exit_code): else: return 1 + logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -file_handler = logging.FileHandler("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/logs/worker_daemon_main.log", delay=False) +file_handler = logging.FileHandler( + "/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/logs/worker_daemon_main.log", + delay=False, +) file_handler.flush = file_handler.stream.flush logger.addHandler(file_handler) + def worker(sock, authenticated): - logger.info("Starting worker process with pid =" + str(os.getpid()) + " socket " + str(sock)) + logger.info( + "Starting worker process with pid =" + str(os.getpid()) + " socket " + str(sock) + ) """ Called by a worker process after the fork(). """ @@ -69,10 +77,10 @@ def worker(sock, authenticated): if not authenticated: client_secret = UTF8Deserializer().loads(infile) if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret: - write_with_length("ok".encode("utf-8"), outfile) + write_with_length(b"ok", outfile) outfile.flush() else: - write_with_length("err".encode("utf-8"), outfile) + write_with_length(b"err", outfile) outfile.flush() sock.close() return 1 @@ -132,7 +140,7 @@ def manager(): while True: try: ready_fds = select.select([0, listen_sock], [], [], 1)[0] - except select.error as ex: + except OSError as ex: if ex[0] == EINTR: continue else: @@ -186,7 +194,7 @@ def manager(): # Therefore, here we redirects it to '/dev/null' by duplicating # another file descriptor for '/dev/null' to the standard input (0). # See SPARK-26175. - devnull = open(os.devnull, "r") + devnull = open(os.devnull) os.dup2(devnull.fileno(), 0) devnull.close() diff --git a/python/sedona/spark/worker/serde.py b/python/sedona/spark/worker/serde.py index 5a33a26610..52e7b663a5 100644 --- a/python/sedona/spark/worker/serde.py +++ b/python/sedona/spark/worker/serde.py @@ -1,11 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + from pyspark.serializers import write_int, SpecialLengths from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer from sedona.spark.worker.udf_info import UDFInfo + class SedonaDBSerializer(ArrowStreamPandasSerializer): def __init__(self, timezone, safecheck, db, udf_info: UDFInfo, cast_to_wkb=False): - super(SedonaDBSerializer, self).__init__(timezone, safecheck) + super().__init__(timezone, safecheck) self.db = db self.udf_info = udf_info self.cast_to_wkb = cast_to_wkb @@ -18,12 +36,15 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer): for batch in batches: table = pa.Table.from_batches(batches=[batch]) import pyarrow as pa + df = self.db.create_data_frame(table) table_name = f"my_table_{index}" df.to_view(table_name) - sql_expression = self.udf_info.sedona_db_transformation_expr(table_name, self.cast_to_wkb) + sql_expression = self.udf_info.sedona_db_transformation_expr( + table_name, self.cast_to_wkb + ) index += 1 diff --git a/python/sedona/spark/worker/udf_info.py b/python/sedona/spark/worker/udf_info.py index 7853133e77..eb278a1511 100644 --- a/python/sedona/spark/worker/udf_info.py +++ b/python/sedona/spark/worker/udf_info.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + from dataclasses import dataclass from sedona.spark import GeometryType @@ -15,11 +32,15 @@ class UDFInfo: arg_offset_str = ", ".join([f"_{el}" for el in self.arg_offsets]) function_expr = f"{self.name}({arg_offset_str})" if isinstance(self.return_type, GeometryType) and cast_to_wkb: - return f"SELECT ST_GeomToSedonaSpark({function_expr}) AS _0 FROM {table_name}" + return ( + f"SELECT ST_GeomToSedonaSpark({function_expr}) AS _0 FROM {table_name}" + ) return f"SELECT {function_expr} AS _0 FROM {table_name}" - def sedona_db_transformation_expr(self, table_name: str, cast_to_wkb: bool = False) -> str: + def sedona_db_transformation_expr( + self, table_name: str, cast_to_wkb: bool = False + ) -> str: fields = [] for arg in self.arg_offsets: if arg in self.geom_offsets and cast_to_wkb: diff --git a/python/sedona/spark/worker/worker.py b/python/sedona/spark/worker/worker.py index 17dae02e63..02fedf0058 100644 --- a/python/sedona/spark/worker/worker.py +++ b/python/sedona/spark/worker/worker.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import importlib import os import sys @@ -8,8 +25,16 @@ from pyspark import TaskContext, shuffle, SparkFiles from pyspark.errors import PySparkRuntimeError from pyspark.java_gateway import local_connect_and_auth from pyspark.resource import ResourceInformation -from pyspark.serializers import read_int, UTF8Deserializer, read_bool, read_long, CPickleSerializer, write_int, \ - write_long, SpecialLengths +from pyspark.serializers import ( + read_int, + UTF8Deserializer, + read_bool, + read_long, + CPickleSerializer, + write_int, + write_long, + SpecialLengths, +) from sedona.spark.worker.serde import SedonaDBSerializer from sedona.spark.worker.udf_info import UDFInfo @@ -18,11 +43,13 @@ from sedona.spark.worker.udf_info import UDFInfo def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False): i = 0 for df in iterator: - i+=1 + i += 1 table_name = f"output_table_{i}" df.to_view(table_name) - function_call_sql = udf_info.get_function_call_sql(table_name, cast_to_wkb=cast_to_wkb) + function_call_sql = udf_info.get_function_call_sql( + table_name, cast_to_wkb=cast_to_wkb + ) df_out = db.sql(function_call_sql) @@ -30,8 +57,7 @@ def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False): at = df_out.to_arrow_table() batches = at.combine_chunks().to_batches() - for batch in batches: - yield batch + yield from batches def check_python_version(utf_serde: UTF8Deserializer, infile) -> str: @@ -50,6 +76,7 @@ def check_python_version(utf_serde: UTF8Deserializer, infile) -> str: return version + def check_barrier_flag(infile): is_barrier = read_bool(infile) bound_port = read_int(infile) @@ -66,6 +93,7 @@ def check_barrier_flag(infile): return is_barrier + def assign_task_context(utf_serde: UTF8Deserializer, infile): stage_id = read_int(infile) partition_id = read_int(infile) @@ -97,6 +125,7 @@ def assign_task_context(utf_serde: UTF8Deserializer, infile): return task_context + def resolve_python_path(utf_serde: UTF8Deserializer, infile): def add_path(path: str): # worker can be used, so do not add path multiple times @@ -131,6 +160,7 @@ def check_broadcast_variables(infile): }, ) + def get_runner_conf(utf_serde: UTF8Deserializer, infile): runner_conf = {} num_conf = read_int(infile) @@ -145,6 +175,7 @@ def read_command(serializer, infile): command = serializer._read_with_length(infile) return command + def read_udf(infile, pickle_ser) -> UDFInfo: num_arg = read_int(infile) arg_offsets = [read_int(infile) for i in range(num_arg)] @@ -162,9 +193,10 @@ def read_udf(infile, pickle_ser) -> UDFInfo: function=sedona_db_udf_expression, return_type=return_type, name=sedona_db_udf_expression._name, - geom_offsets=[0] + geom_offsets=[0], ) + def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo: num_udfs = read_int(infile) @@ -237,7 +269,7 @@ def main(infile, outfile): safecheck=False, db=sedona_db, udf_info=udf, - cast_to_wkb=cast_to_wkb + cast_to_wkb=cast_to_wkb, ) number_of_geometries = read_int(infile) @@ -251,13 +283,13 @@ def main(infile, outfile): udf.geom_offsets = geom_offsets iterator = serde.load_stream(infile) - out_iterator = apply_iterator(db=sedona_db, iterator=iterator, udf_info=udf, cast_to_wkb=cast_to_wkb) + out_iterator = apply_iterator( + db=sedona_db, iterator=iterator, udf_info=udf, cast_to_wkb=cast_to_wkb + ) serde.dump_stream(out_iterator, outfile) - write_statistics( - infile, outfile, boot_time=boot_time, init_time=init_time - ) + write_statistics(infile, outfile, boot_time=boot_time, init_time=init_time) if __name__ == "__main__": diff --git a/python/setup.py b/python/setup.py index 66ab74701b..ae5e7bf174 100644 --- a/python/setup.py +++ b/python/setup.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + from setuptools import setup import numpy diff --git a/python/src/geomserde_speedup_module.c b/python/src/geomserde_speedup_module.c index 621f956cd0..1d7aefcd77 100644 --- a/python/src/geomserde_speedup_module.c +++ b/python/src/geomserde_speedup_module.c @@ -19,16 +19,14 @@ #define PY_SSIZE_T_CLEAN #include <Python.h> +#include <numpy/ndarraytypes.h> +#include <numpy/npy_3kcompat.h> +#include <numpy/ufuncobject.h> #include <stdio.h> -// -//#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION #include "geomserde.h" #include "geos_c_dyn.h" #include "pygeos/c_api.h" -#include <numpy/ndarraytypes.h> -#include <numpy/npy_3kcompat.h> -#include <numpy/ufuncobject.h> PyDoc_STRVAR(module_doc, "Geometry serialization/deserialization module."); @@ -230,7 +228,7 @@ static PyObject *serialize(PyObject *self, PyObject *args) { return do_serialize(geos_geom); } -static PyObject *deserialize_2(PyObject *self, PyObject *args) { +static PyObject *deserialize(PyObject *self, PyObject *args) { GEOSContextHandle_t handle = NULL; int length = 0; GEOSGeometry *geom = do_deserialize(args, &handle, &length); @@ -268,86 +266,86 @@ static PyObject *deserialize_1(PyObject *self, PyObject *args) { } static PyObject *to_sedona_func(PyObject *self, PyObject *args) { - import_array(); - PyObject *input_obj = NULL; - if (!PyArg_ParseTuple(args, "O", &input_obj)){ - return NULL; - }; - - PyArrayObject *array = (PyArrayObject *)input_obj; - PyObject **objs = (PyObject **)PyArray_DATA(array); - - GEOSContextHandle_t handle = get_geos_context_handle(); - if (handle == NULL) { - return NULL; - } - - npy_intp n = PyArray_SIZE(input_obj); - npy_intp dims[1] = {n}; - PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); - for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { - PyObject *obj = objs[i]; - GEOSGeometry *geos_geom = NULL; - char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom); - - PyObject *serialized = do_serialize(geos_geom); - PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized); - } + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)) { + return NULL; + }; + + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); - return out; + GEOSContextHandle_t handle = get_geos_context_handle(); + if (handle == NULL) { + return NULL; + } + + npy_intp n = PyArray_SIZE(input_obj); + npy_intp dims[1] = {n}; + PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); + for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { + PyObject *obj = objs[i]; + GEOSGeometry *geos_geom = NULL; + char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom); + + PyObject *serialized = do_serialize(geos_geom); + PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized); + } + + return out; } /* Module definition for Shapely 2.x */ static PyObject *from_sedona_func(PyObject *self, PyObject *args) { - import_array(); - PyObject *input_obj = NULL; - if (!PyArg_ParseTuple(args, "O", &input_obj)){ - return NULL; - }; + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)) { + return NULL; + }; - GEOSContextHandle_t handle = get_geos_context_handle(); + GEOSContextHandle_t handle = get_geos_context_handle(); - PyArrayObject *array = (PyArrayObject *)input_obj; - PyObject **objs = (PyObject **)PyArray_DATA(array); + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); - int p_bytes_read = 0; + int p_bytes_read = 0; - npy_intp n = PyArray_SIZE(input_obj); + npy_intp n = PyArray_SIZE(input_obj); - npy_intp dims[1] = {n}; - PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); + npy_intp dims[1] = {n}; + PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); - for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { - PyObject *obj = objs[i]; - if (!PyBytes_Check(obj)) { - PyErr_SetString(PyExc_TypeError, "Expected bytes"); - return NULL; - } + for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { + PyObject *obj = objs[i]; + if (!PyBytes_Check(obj)) { + PyErr_SetString(PyExc_TypeError, "Expected bytes"); + return NULL; + } - char *buf = PyBytes_AS_STRING(obj); + char *buf = PyBytes_AS_STRING(obj); - Py_ssize_t len = PyBytes_GET_SIZE(obj); + Py_ssize_t len = PyBytes_GET_SIZE(obj); - GEOSGeometry *geom = NULL; + GEOSGeometry *geom = NULL; - SedonaErrorCode err = sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read); - if (err != SEDONA_SUCCESS) { - handle_geomserde_error(err); - return NULL; - } - PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle); + SedonaErrorCode err = + sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read); + if (err != SEDONA_SUCCESS) { + handle_geomserde_error(err); + return NULL; + } + PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle); - PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom); - } + PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom); + } - return out; + return out; } - static PyMethodDef geomserde_methods_shapely_2[] = { {"load_libgeos_c", load_libgeos_c, METH_VARARGS, "Load libgeos_c."}, {"serialize", serialize, METH_VARARGS, "Serialize geometry object as bytearray."}, - {"deserialize_2", deserialize_2, METH_VARARGS, + {"deserialize", deserialize, METH_VARARGS, "Deserialize bytes-like object to geometry object."}, {"from_sedona_func", from_sedona_func, METH_VARARGS, "Deserialize bytes-like object to geometry object."}, @@ -355,18 +353,6 @@ static PyMethodDef geomserde_methods_shapely_2[] = { "Deserialize bytes-like object to geometry object."}, {NULL, NULL, 0, NULL}, /* Sentinel */ }; -// -//static int add_from_sedona_func_to_module(PyObject *m) { -// PyObject *capsule = PyCapsule_New((void *)from_sedona_func, "from_sedona_func", NULL); -// if (capsule == NULL) { -// return -1; -// } -// if (PyModule_AddObject(m, "from_sedona_func", capsule) < 0) { -// Py_DECREF(capsule); -// return -1; -// } -// return 0; -//} static struct PyModuleDef geomserde_module_shapely_2 = { PyModuleDef_HEAD_INIT, "geomserde_speedup", module_doc, 0, diff --git a/python/tests/test_base.py b/python/tests/test_base.py index e240a09758..300d937d27 100644 --- a/python/tests/test_base.py +++ b/python/tests/test_base.py @@ -70,11 +70,15 @@ class TestBase: "spark.sedona.stac.load.itemsLimitMax", "20", ) - .config("spark.executor.memory", "10G") \ - .config("spark.driver.memory", "10G") \ - .config("sedona.python.worker.udf.daemon.module", "sedona.spark.worker.daemon") \ - .config("sedona.python.worker.daemon.enabled", "false") \ - # Pandas on PySpark doesn't work with ANSI mode, which is enabled by default + .config("spark.executor.memory", "10G") + .config("spark.driver.memory", "10G") + .config( + "sedona.python.worker.udf.daemon.module", + "sedona.spark.worker.daemon", + ) + .config( + "sedona.python.worker.daemon.enabled", "false" + ) # Pandas on PySpark doesn't work with ANSI mode, which is enabled by default # in Spark 4 .config("spark.sql.ansi.enabled", "false") ) diff --git a/python/tests/utils/test_sedona_db_vectorized_udf.py b/python/tests/utils/test_sedona_db_vectorized_udf.py index 4b266384fa..eea84eec91 100644 --- a/python/tests/utils/test_sedona_db_vectorized_udf.py +++ b/python/tests/utils/test_sedona_db_vectorized_udf.py @@ -1,9 +1,24 @@ -import time +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import numpy as np from sedona.spark.sql.functions import sedona_db_vectorized_udf -from sedona.spark.utils.udf import to_sedona_func, from_sedona_func +from sedona.spark.utils.udf import to_sedona, from_sedona from tests.test_base import TestBase import pyarrow as pa import shapely @@ -11,107 +26,21 @@ from sedona.sql import GeometryType from pyspark.sql.functions import expr, lit from pyspark.sql.types import DoubleType, IntegerType, ByteType from sedona.spark.sql import ST_X -from shapely._enum import ParamEnum - -def test_m(): - on_invalid="raise" - wkb = b'\x12\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?' - geometry = np.asarray([wkb, wkb], dtype=object) - - DecodingErrorOptions = ParamEnum( - "DecodingErrorOptions", {"ignore": 0, "warn": 1, "raise": 2, "fix": 3} - ) - - # print("sss") - - - # <class 'numpy.ndarray'> - # object - # C_CONTIGUOUS : True - # F_CONTIGUOUS : True - # OWNDATA : False - # WRITEABLE : True - # ALIGNED : True - # WRITEBACKIFCOPY : False - # print(type(geometry)) - # print(geometry.dtype) - # print(geometry.flags) - - result = from_sedona_func(geometry) - - result2 = to_sedona_func(result) - -# ensure the input has object dtype, to avoid numpy inferring it as a -# fixed-length string dtype (which removes trailing null bytes upon access -# of array elements) - # - # def from_sedona_func(arr): - # try: - # from . import sedonaserde_vectorized_udf_module - # print(sedonaserde_vectorized_udf_module.from_sedona_func_3(arr)) - # except Exception as e: - # print("Cannot import sedonaserde_vectorized_udf_module:") - # print(e) - # # print() - # return None -# -# def from_wkb(geometry, on_invalid="raise", **kwargs): -# r"""Create geometries from the Well-Known Binary (WKB) representation. -# -# The Well-Known Binary format is defined in the `OGC Simple Features -# Specification for SQL <https://www.opengeospatial.org/standards/sfs>`__. -# -# Parameters -# ---------- -# geometry : str or array_like -# The WKB byte object(s) to convert. -# on_invalid : {"raise", "warn", "ignore", "fix"}, default "raise" -# Indicates what to do when an invalid WKB is encountered. Note that the -# validations involved are very basic, e.g. the minimum number of points -# for the geometry type. For a thorough check, use :func:`is_valid` after -# conversion to geometries. Valid options are: -# -# - raise: an exception will be raised if any input geometry is invalid. -# - warn: a warning will be raised and invalid WKT geometries will be -# returned as ``None``. -# - ignore: invalid geometries will be returned as ``None`` without a -# warning. -# - fix: an effort is made to fix invalid input geometries (currently just -# unclosed rings). If this is not possible, they are returned as -# ``None`` without a warning. Requires GEOS >= 3.11. -# -# .. versionadded:: 2.1.0 -# **kwargs -# See :ref:`NumPy ufunc docs <ufuncs.kwargs>` for other keyword arguments. -# -# Examples -# -------- -# >>> import shapely -# >>> shapely.from_wkb(b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?') -# <POINT (1 1)> -# -# """ # noqa: E501 -# if not np.isscalar(on_invalid): -# raise TypeError("on_invalid only accepts scalar values") -# -# invalid_handler = np.uint8(DecodingErrorOptions.get_value(on_invalid)) -# -# # ensure the input has object dtype, to avoid numpy inferring it as a -# # fixed-length string dtype (which removes trailing null bytes upon access -# # of array elements) -# geometry = np.asarray(geometry, dtype=object) -# return lib.from_wkb(geometry, invalid_handler, **kwargs) + class TestSedonaDBArrowFunction(TestBase): def test_vectorized_udf(self): - @sedona_db_vectorized_udf(return_type=GeometryType(), input_types=[GeometryType(), IntegerType()]) + @sedona_db_vectorized_udf( + return_type=GeometryType(), input_types=[ByteType(), IntegerType()] + ) def my_own_function(geom, distance): geom_wkb = pa.array(geom.storage.to_array()) + geometry_array = np.asarray(geom_wkb, dtype=object) distance = pa.array(distance.to_array()) - geom = shapely.from_wkb(geom_wkb) + geom = from_sedona(geometry_array) result_shapely = shapely.centroid(geom) - return pa.array(shapely.to_wkb(result_shapely)) + return pa.array(to_sedona(result_shapely)) df = self.spark.createDataFrame( [ @@ -125,56 +54,61 @@ class TestSedonaDBArrowFunction(TestBase): df.select(ST_X(my_own_function(df.wkt, lit(100)).alias("geom"))).show() def test_geometry_to_double(self): - @sedona_db_vectorized_udf(return_type=DoubleType(), input_types=[GeometryType()]) + @sedona_db_vectorized_udf(return_type=DoubleType(), input_types=[ByteType()]) def geometry_to_non_geometry_udf(geom): geom_wkb = pa.array(geom.storage.to_array()) - geom = shapely.from_wkb(geom_wkb) + geometry_array = np.asarray(geom_wkb, dtype=object) + geom = from_sedona(geometry_array) result_shapely = shapely.get_x(shapely.centroid(geom)) - return pa.array(result_shapely, pa.float64()) + return pa.array(result_shapely) df = self.spark.createDataFrame( [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")], ["id", "wkt"], ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)")) - values = df.select(geometry_to_non_geometry_udf(df.wkt).alias("x_coord")) \ - .collect() + values = df.select( + geometry_to_non_geometry_udf(df.wkt).alias("x_coord") + ).collect() values_list = [row["x_coord"] for row in values] assert values_list == [1.0, 2.0, 3.0] def test_geometry_to_int(self): - @sedona_db_vectorized_udf(return_type=IntegerType(), input_types=[GeometryType()]) + @sedona_db_vectorized_udf(return_type=IntegerType(), input_types=[ByteType()]) def geometry_to_int(geom): geom_wkb = pa.array(geom.storage.to_array()) - geom = shapely.from_wkb(geom_wkb) + geometry_array = np.asarray(geom_wkb, dtype=object) + + geom = from_sedona(geometry_array) result_shapely = shapely.get_num_points(geom) - return pa.array(result_shapely, pa.int32()) + return pa.array(result_shapely) df = self.spark.createDataFrame( [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")], ["id", "wkt"], ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)")) - values = df.select(geometry_to_int(df.wkt)) \ - .collect() + values = df.select(geometry_to_int(df.wkt)).collect() values_list = [row[0] for row in values] assert values_list == [0, 0, 0] def test_geometry_crs_preservation(self): - @sedona_db_vectorized_udf(return_type=GeometryType(), input_types=[GeometryType()]) + @sedona_db_vectorized_udf(return_type=GeometryType(), input_types=[ByteType()]) def return_same_geometry(geom): geom_wkb = pa.array(geom.storage.to_array()) - geom = shapely.from_wkb(geom_wkb) + geometry_array = np.asarray(geom_wkb, dtype=object) + + geom = from_sedona(geometry_array) - return pa.array(shapely.to_wkb(geom)) + return pa.array(to_sedona(geom)) df = self.spark.createDataFrame( [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")], @@ -183,70 +117,8 @@ class TestSedonaDBArrowFunction(TestBase): result_df = df.select(return_same_geometry(df.wkt).alias("geom")) - crs_list = result_df.selectExpr("ST_SRID(geom)").rdd.flatMap(lambda x: x).collect() + crs_list = ( + result_df.selectExpr("ST_SRID(geom)").rdd.flatMap(lambda x: x).collect() + ) assert crs_list == [3857, 3857, 3857] - - def test_geometry_to_geometry(self): - @sedona_db_vectorized_udf(return_type=GeometryType(), input_types=[ByteType()]) - def buffer_geometry(geom): - geom_wkb = pa.array(geom.storage.to_array()) - geometry_array = np.asarray(geom_wkb, dtype=object) - geom = from_sedona_func(geometry_array) - - result_shapely = shapely.buffer(geom, 10) - - return pa.array(to_sedona_func(result_shapely)) - - df = self.spark.read.\ - format("geoparquet").\ - load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l1") - - # 1 045 770 - # print(df.count()) - - # df.unionAll(df).unionAll(df).unionAll(df).unionAll(df).unionAll(df).\ - # unionAll(df).unionAll(df).unionAll(df).unionAll(df).unionAll(df).\ - # write.format("geoparquet").mode("overwrite").save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l2") - # 18 24 - # df.union(df).union(df).union(df).union(df).union(df).union(df).\ - # write.format("geoparquet").mode("overwrite").save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_large_3") - - values = df.select(buffer_geometry(df.geometry).alias("geometry")).\ - selectExpr("ST_Area(geometry) as area").\ - selectExpr("Sum(area) as total_area") - - values.show() - - # for _ in range(4): - # start_time = time.time() - # values.show() - # end_time = time.time() - # print(f"Execution time: {end_time - start_time} seconds") - - def test_geometry_to_geometry_normal_udf(self): - from pyspark.sql.functions import udf - - def create_buffer(geom): - return geom.buffer(10) - - create_buffer_udf = udf(create_buffer, GeometryType()) - - df = self.spark.read. \ - format("geoparquet"). \ - load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l2") - - # print(df.count()) - # df.limit(10).collect() - values = df.select(create_buffer_udf(df.geometry).alias("geometry")). \ - selectExpr("ST_Area(geometry) as area"). \ - selectExpr("Sum(area) as total_area") - - values.show() - - # for _ in range(4): - # start_time = time.time() - # values.show() - # end_time = time.time() - # print(f"Execution time: {end_time - start_time} seconds") -# 1 045 770 diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala index 4fe4acfb12..94ce194c65 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala @@ -73,21 +73,11 @@ class StrategySuite extends TestBaseScala with Matchers { .withColumn("geometry", expr("ST_SetSRID(geom, '4326')")) .select(sedonaDBGeometryToGeometryFunction(col("geometry"), lit(100)).alias("geom")) - dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS x") + dfVectorized + .selectExpr("ST_X(ST_Centroid(geom)) AS x") .selectExpr("sum(x)") .as[Double] - .collect().head shouldEqual 101 -// -// val dfCopied = sparkSession.read -// .format("geoparquet") -// .load( -// "/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona/barcelona.geoparquet") -// -// val values = dfCopied -// .select(sedonaDBGeometryToGeometryFunction(col("geometry"), lit(10)).alias("geom")) -// .selectExpr("ST_Area(geom) as area") -// .selectExpr("Sum(area) as total_area") -// -// values.show() + .collect() + .head shouldEqual 101 } }
