This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch add-sedona-worker-daemon-mode
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 1c3aa88dadb434fb11cd72d0606cc90b8112f269
Author: pawelkocinski <[email protected]>
AuthorDate: Fri Dec 19 23:13:31 2025 +0100

    SEDONA-738 Add sedonadb worker
---
 .github/workflows/java.yml                         |   3 +-
 .github/workflows/pyflink.yml                      |   2 +-
 .github/workflows/python-extension.yml             |   2 +-
 .github/workflows/python.yml                       |  26 +-
 pom.xml                                            |   1 +
 python/pyproject.toml                              |   4 +-
 python/sedona/spark/sql/functions.py               |  87 +++++-
 python/sedona/spark/utils/geometry_serde.py        |  38 ++-
 python/sedona/spark/worker/__init__.py             |  16 +
 python/sedona/spark/worker/daemon.py               | 218 +++++++++++++
 python/sedona/spark/worker/serde.py                |  76 +++++
 python/sedona/spark/worker/udf_info.py             |  54 ++++
 python/sedona/spark/worker/worker.py               | 304 ++++++++++++++++++
 python/setup.py                                    |  23 ++
 python/src/geomserde_speedup_module.c              | 110 +++++++
 python/tests/test_base.py                          |  12 +-
 .../tests/utils/test_sedona_db_vectorized_udf.py   | 147 +++++++++
 .../org/apache/sedona/spark/SedonaContext.scala    |   2 +-
 .../org/apache/sedona/sql/UDF/PythonEvalType.scala |   9 +
 .../execution/python/SedonaArrowPythonRunner.scala |  60 ++++
 .../sql/execution/python/SedonaArrowStrategy.scala | 161 ++++++++++
 .../execution/python/SedonaBasePythonRunner.scala  | 129 ++++++++
 .../execution/python/SedonaDBWorkerFactory.scala   | 343 +++++++++++++++++++++
 .../execution/python/SedonaPythonArrowInput.scala  | 108 +++++++
 .../execution/python/SedonaPythonArrowOutput.scala | 179 +++++++++++
 .../spark/sql/execution/python/WorkerContext.scala |  58 ++++
 .../spark/sql/udf/ExtractSedonaUDFRule.scala       |  14 +-
 .../apache/spark/sql/udf/SedonaArrowStrategy.scala |  89 ------
 .../org/apache/sedona/sql/TestBaseScala.scala      |   4 +-
 .../org/apache/spark/sql/udf/StrategySuite.scala   |  42 ++-
 .../apache/spark/sql/udf/TestScalarPandasUDF.scala | 146 +++++++--
 .../python}/SedonaArrowStrategy.scala              |   7 +-
 .../spark/sql/udf/ExtractSedonaUDFRule.scala       |  14 +-
 33 files changed, 2325 insertions(+), 163 deletions(-)

diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index 76834c9d63..36c95d6c66 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -109,8 +109,9 @@ jobs:
           SPARK_COMPAT_VERSION=${SPARK_VERSION:0:3}
 
           if [[ "${SPARK_VERSION}" == "3.5"* ]] || [[ "${SPARK_VERSION}" == 
"4."* ]]; then
-              pip install pyspark==$SPARK_VERSION pandas shapely apache-sedona 
pyarrow
+              pip install pyspark==$SPARK_VERSION pandas shapely apache-sedona 
pyarrow geoarrow-pyarrow sedonadb
               export SPARK_HOME=$(python -c "import pyspark; 
print(pyspark.__path__[0])")
+              (cd python; pip install --force-reinstall --no-deps -e .)
           fi
 
           mvn -q clean install -Dspark=${SPARK_COMPAT_VERSION} 
-Dscala=${SCALA_VERSION:0:4} -Dspark.version=${SPARK_VERSION} ${SKIP_TESTS}
diff --git a/.github/workflows/pyflink.yml b/.github/workflows/pyflink.yml
index 9b86d74ab7..8cb5632d93 100644
--- a/.github/workflows/pyflink.yml
+++ b/.github/workflows/pyflink.yml
@@ -70,7 +70,7 @@ jobs:
         run: |
           cd python
           uv add apache-flink==1.20.1
-          uv sync
+      #          uv sync --extra flink
       - name: Run PyFlink tests
         run: |
           wget -q 
https://repo1.maven.org/maven2/org/datasyslab/geotools-wrapper/1.8.0-33.1-rc1/geotools-wrapper-1.8.0-33.1-rc1.jar
diff --git a/.github/workflows/python-extension.yml 
b/.github/workflows/python-extension.yml
index c4eb20a881..6fb88bd0c9 100644
--- a/.github/workflows/python-extension.yml
+++ b/.github/workflows/python-extension.yml
@@ -51,7 +51,7 @@ jobs:
     strategy:
       matrix:
         os: ['ubuntu-latest', 'windows-latest', 'macos-15']
-        python: ['3.11', '3.10', '3.9', '3.8']
+        python: ['3.11', '3.10', '3.9']
     runs-on: ${{ matrix.os }}
     defaults:
       run:
diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml
index 05782bbae9..9b5f170597 100644
--- a/.github/workflows/python.yml
+++ b/.github/workflows/python.yml
@@ -86,10 +86,6 @@ jobs:
             java: '11'
             python: '3.9'
           - spark: '3.5.0'
-            scala: '2.12.8'
-            java: '11'
-            python: '3.8'
-          - spark: '3.4.0'
             scala: '2.12.8'
             java: '11'
             python: '3.11'
@@ -101,15 +97,6 @@ jobs:
             scala: '2.12.8'
             java: '11'
             python: '3.9'
-          - spark: '3.4.0'
-            scala: '2.12.8'
-            java: '11'
-            python: '3.8'
-          - spark: '3.4.0'
-            scala: '2.12.8'
-            java: '11'
-            python: '3.8'
-            shapely: '1'
 
     steps:
       - uses: actions/checkout@v6
@@ -181,7 +168,18 @@ jobs:
         run: |
           cd python
           export SPARK_HOME=$(uv run python -c "import site; 
print(site.getsitepackages()[0]+'/pyspark')")
-          uv run pytest -v tests
+          uv run pytest -m "not vectorized" -v tests
+      - name: Run vectorized udf tests
+        if: ${{ matrix.spark < '4.0.0' }}
+        run: |
+          cd python
+          export SPARK_HOME=$(uv run python -c "import site; 
print(site.getsitepackages()[0]+'/pyspark')")
+          uv pip install --force-reinstall --no-deps -e .
+          uv remove apache-flink --optional flink
+          uv add "pyarrow>=16.0.0"
+          uv add "shapely>=2.0.0"
+          uv add sedonadb geopandas geoarrow-pyarrow
+          uv run pytest -m vectorized tests
       - name: Run basic tests without rasterio
         run: |
           cd python
diff --git a/pom.xml b/pom.xml
index d6e4e81319..071f5233d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -631,6 +631,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-javadoc-plugin</artifactId>
+<!--                <version>3.12.0</version>-->
                 <version>2.10.4</version>
                 <executions>
                     <execution>
diff --git a/python/pyproject.toml b/python/pyproject.toml
index b988966c4f..2205bd047e 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [build-system]
-requires = ["setuptools>=69", "wheel"]
+requires = ["setuptools", "wheel", "numpy"]
 build-backend = "setuptools.build_meta"
 
 [project]
@@ -26,7 +26,7 @@ description = "Apache Sedona is a cluster computing system 
for processing large-
 readme = "README.md"
 license = { text = "Apache-2.0" }
 authors = [ { name = "Apache Sedona", email = "[email protected]" } ]
-requires-python = ">=3.8"
+requires-python = ">=3.9"
 classifiers = [
   "Programming Language :: Python :: 3",
   "License :: OSI Approved :: Apache Software License",
diff --git a/python/sedona/spark/sql/functions.py 
b/python/sedona/spark/sql/functions.py
index 2420301d52..e3224118f9 100644
--- a/python/sedona/spark/sql/functions.py
+++ b/python/sedona/spark/sql/functions.py
@@ -21,12 +21,20 @@ from enum import Enum
 
 import pandas as pd
 
-from sedona.spark.sql.types import GeometryType
 from sedona.spark.utils import geometry_serde
-from pyspark.sql.udf import UserDefinedFunction
-from pyspark.sql.types import DataType
 from shapely.geometry.base import BaseGeometry
+from pyspark.sql.udf import UserDefinedFunction
+from sedona.spark.sql.types import GeometryType
+from pyspark.sql.types import (
+    DataType,
+    FloatType,
+    DoubleType,
+    IntegerType,
+    StringType,
+    ByteType,
+)
 
+from sedona.spark.utils.geometry_serde import sedona_db_speedup_enabled
 
 SEDONA_SCALAR_EVAL_TYPE = 5200
 SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF"
@@ -142,3 +150,76 @@ def serialize_to_geometry_if_geom(data, return_type: 
DataType):
         return geometry_serde.serialize(data)
 
     return data
+
+
+def infer_pa_type(spark_type: DataType):
+    import pyarrow as pa
+    import geoarrow.pyarrow as ga
+
+    if isinstance(spark_type, GeometryType):
+        return ga.wkb()
+    elif isinstance(spark_type, FloatType):
+        return pa.float32()
+    elif isinstance(spark_type, DoubleType):
+        return pa.float64()
+    elif isinstance(spark_type, IntegerType):
+        return pa.int32()
+    elif isinstance(spark_type, StringType):
+        return pa.string()
+    else:
+        raise NotImplementedError(f"Type {spark_type} is not supported yet.")
+
+
+def infer_input_type(spark_type: DataType):
+    from sedonadb import udf as sedona_udf_module
+
+    if isinstance(spark_type, GeometryType):
+        return sedona_udf_module.GEOMETRY
+    elif (
+        isinstance(spark_type, FloatType)
+        or isinstance(spark_type, DoubleType)
+        or isinstance(spark_type, IntegerType)
+    ):
+        return sedona_udf_module.NUMERIC
+    elif isinstance(spark_type, StringType):
+        return sedona_udf_module.STRING
+    elif isinstance(spark_type, ByteType):
+        return sedona_udf_module.BINARY
+    else:
+        raise NotImplementedError(f"Type {spark_type} is not supported yet.")
+
+
+def infer_input_types(spark_types: list[DataType]):
+    pa_types = []
+    for spark_type in spark_types:
+        pa_type = infer_input_type(spark_type)
+        pa_types.append(pa_type)
+
+    return pa_types
+
+
+def sedona_db_vectorized_udf(
+    return_type: DataType,
+    input_types: list[DataType],
+):
+    from sedonadb import udf as sedona_udf_module
+
+    eval_type = 6200
+    if sedona_db_speedup_enabled:
+        eval_type = 6201
+
+    def apply_fn(fn):
+        out_type = infer_pa_type(return_type)
+        input_types_sedona_db = infer_input_types(input_types)
+
+        @sedona_udf_module.arrow_udf(out_type, 
input_types=input_types_sedona_db)
+        def shapely_udf(*args, **kwargs):
+            return fn(*args, **kwargs)
+
+        udf = UserDefinedFunction(
+            lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", 
evalType=eval_type
+        )
+
+        return udf
+
+    return apply_fn
diff --git a/python/sedona/spark/utils/geometry_serde.py 
b/python/sedona/spark/utils/geometry_serde.py
index 103eb49817..a8b58e05a5 100644
--- a/python/sedona/spark/utils/geometry_serde.py
+++ b/python/sedona/spark/utils/geometry_serde.py
@@ -24,7 +24,7 @@ import shapely
 from shapely.geometry.base import BaseGeometry
 
 speedup_enabled = False
-
+sedona_db_speedup_enabled = False
 
 # Use geomserde_speedup when available, otherwise fallback to general pure
 # python implementation.
@@ -62,7 +62,14 @@ try:
                 return None
             return geomserde_speedup.deserialize(buf)
 
+        def to_sedona(arr):
+            return geomserde_speedup.to_sedona_func(arr)
+
+        def from_sedona(arr):
+            return geomserde_speedup.from_sedona_func(arr)
+
         speedup_enabled = True
+        sedona_db_speedup_enabled = True
 
     elif shapely.__version__.startswith("1."):
         # Shapely 1.x uses ctypes.CDLL to load geos_c library. We can obtain 
the
@@ -123,14 +130,43 @@ try:
             ob.__dict__["_is_empty"] = False
             return ob, bytes_read
 
+        warn(
+            f"optimized sedonadb vectorized function is only available for 
shapely 2.x, using fallback implementation."
+        )
+
+        def to_sedona(arr):
+            return shapely.to_wkb(arr)
+
+        def from_sedona(arr):
+            return shapely.from_wkb(arr)
+
         speedup_enabled = True
 
     else:
+
+        def to_sedona(arr):
+            return shapely.to_wkb(arr)
+
+        def from_sedona(arr):
+            return shapely.from_wkb(arr)
+
         # fallback to our general pure python implementation
         from .geometry_serde_general import deserialize, serialize
 
+
 except Exception as e:
     warn(
         f"Cannot load geomserde_speedup, fallback to general python 
implementation. Reason: {e}"
     )
+
+    warn(
+        f"Cannot load optimized version of sedonadb vectorized function, using 
fallback implementation. Reason: {e}"
+    )
+
+    def to_sedona(arr):
+        return shapely.to_wkb(arr)
+
+    def from_sedona(arr):
+        return shapely.from_wkb(arr)
+
     from .geometry_serde_general import deserialize, serialize
diff --git a/python/sedona/spark/worker/__init__.py 
b/python/sedona/spark/worker/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/python/sedona/spark/worker/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/python/sedona/spark/worker/daemon.py 
b/python/sedona/spark/worker/daemon.py
new file mode 100644
index 0000000000..0c03dde5b8
--- /dev/null
+++ b/python/sedona/spark/worker/daemon.py
@@ -0,0 +1,218 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numbers
+import os
+import signal
+import select
+import socket
+import sys
+import traceback
+import time
+import gc
+from errno import EINTR, EAGAIN
+from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
+from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
+
+from sedona.spark.worker.worker import main as worker_main
+from pyspark.serializers import read_int, write_int, write_with_length, 
UTF8Deserializer
+
+
+def compute_real_exit_code(exit_code):
+    # SystemExit's code can be integer or string, but os._exit only accepts 
integers
+    if isinstance(exit_code, numbers.Integral):
+        return exit_code
+    else:
+        return 1
+
+
+def worker(sock, authenticated):
+    """
+    Called by a worker process after the fork().
+    """
+    signal.signal(SIGHUP, SIG_DFL)
+    signal.signal(SIGCHLD, SIG_DFL)
+    signal.signal(SIGTERM, SIG_DFL)
+    # restore the handler for SIGINT,
+    # it's useful for debugging (show the stacktrace before exit)
+    signal.signal(SIGINT, signal.default_int_handler)
+
+    # Read the socket using fdopen instead of socket.makefile() because the 
latter
+    # seems to be very slow; note that we need to dup() the file descriptor 
because
+    # otherwise writes also cause a seek that makes us miss data on the read 
side.
+    buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
+    infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)
+    outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
+
+    if not authenticated:
+        client_secret = UTF8Deserializer().loads(infile)
+        if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret:
+            write_with_length(b"ok", outfile)
+            outfile.flush()
+        else:
+            write_with_length(b"err", outfile)
+            outfile.flush()
+            sock.close()
+            return 1
+
+    exit_code = 0
+    try:
+        worker_main(infile, outfile)
+    except SystemExit as exc:
+        exit_code = compute_real_exit_code(exc.code)
+    finally:
+        try:
+            outfile.flush()
+        except Exception:  # nosec
+            pass
+    return exit_code
+
+
+def manager():
+    # Create a new process group to corral our children
+    os.setpgid(0, 0)
+
+    # Create a listening socket on the loopback interface
+    if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true":
+        listen_sock = socket.socket(AF_INET6, SOCK_STREAM)
+        listen_sock.bind(("::1", 0, 0, 0))
+        listen_sock.listen(max(1024, SOMAXCONN))
+        listen_host, listen_port, _, _ = listen_sock.getsockname()
+    else:
+        listen_sock = socket.socket(AF_INET, SOCK_STREAM)
+        listen_sock.bind(("127.0.0.1", 0))
+        listen_sock.listen(max(1024, SOMAXCONN))
+        listen_host, listen_port = listen_sock.getsockname()
+
+    # re-open stdin/stdout in 'wb' mode
+    stdin_bin = os.fdopen(sys.stdin.fileno(), "rb", 4)
+    stdout_bin = os.fdopen(sys.stdout.fileno(), "wb", 4)
+    write_int(listen_port, stdout_bin)
+    stdout_bin.flush()
+
+    def shutdown(code):
+        signal.signal(SIGTERM, SIG_DFL)
+        # Send SIGHUP to notify workers of shutdown
+        os.kill(0, SIGHUP)
+        sys.exit(code)
+
+    def handle_sigterm(*args):
+        shutdown(1)
+
+    signal.signal(SIGTERM, handle_sigterm)  # Gracefully exit on SIGTERM
+    signal.signal(SIGHUP, SIG_IGN)  # Don't die on SIGHUP
+    signal.signal(SIGCHLD, SIG_IGN)
+
+    reuse = os.environ.get("SPARK_REUSE_WORKER")
+
+    # Initialization complete
+    try:
+        while True:
+            try:
+                ready_fds = select.select([0, listen_sock], [], [], 1)[0]
+            except OSError as ex:
+                if ex[0] == EINTR:
+                    continue
+                else:
+                    raise
+
+            if 0 in ready_fds:
+                try:
+                    worker_pid = read_int(stdin_bin)
+                except EOFError:
+                    # Spark told us to exit by closing stdin
+                    shutdown(0)
+                try:
+                    os.kill(worker_pid, signal.SIGKILL)
+                except OSError:
+                    pass  # process already died
+
+            if listen_sock in ready_fds:
+                try:
+                    sock, _ = listen_sock.accept()
+                except OSError as e:
+                    if e.errno == EINTR:
+                        continue
+                    raise
+
+                # Launch a worker process
+                try:
+                    pid = os.fork()
+                except OSError as e:
+                    if e.errno in (EAGAIN, EINTR):
+                        time.sleep(1)
+                        pid = os.fork()  # error here will shutdown daemon
+                    else:
+                        outfile = sock.makefile(mode="wb")
+                        write_int(e.errno, outfile)  # Signal that the fork 
failed
+                        outfile.flush()
+                        outfile.close()
+                        sock.close()
+                        continue
+
+                if pid == 0:
+                    # in child process
+                    listen_sock.close()
+
+                    # It should close the standard input in the child process 
so that
+                    # Python native function executions stay intact.
+                    #
+                    # Note that if we just close the standard input (file 
descriptor 0),
+                    # the lowest file descriptor (file descriptor 0) will be 
allocated,
+                    # later when other file descriptors should happen to open.
+                    #
+                    # Therefore, here we redirects it to '/dev/null' by 
duplicating
+                    # another file descriptor for '/dev/null' to the standard 
input (0).
+                    # See SPARK-26175.
+                    devnull = open(os.devnull)
+                    os.dup2(devnull.fileno(), 0)
+                    devnull.close()
+
+                    try:
+                        # Acknowledge that the fork was successful
+                        outfile = sock.makefile(mode="wb")
+                        write_int(os.getpid(), outfile)
+                        outfile.flush()
+                        outfile.close()
+                        authenticated = False
+                        while True:
+                            code = worker(sock, authenticated)
+                            if code == 0:
+                                authenticated = True
+                            if not reuse or code:
+                                # wait for closing
+                                try:
+                                    while sock.recv(1024):
+                                        pass
+                                except Exception:  # nosec
+                                    pass
+                                break
+                            gc.collect()
+                    except BaseException:
+                        traceback.print_exc()
+                        os._exit(1)
+                    else:
+                        os._exit(0)
+                else:
+                    sock.close()
+
+    finally:
+        shutdown(1)
+
+
+if __name__ == "__main__":
+    manager()
diff --git a/python/sedona/spark/worker/serde.py 
b/python/sedona/spark/worker/serde.py
new file mode 100644
index 0000000000..52e7b663a5
--- /dev/null
+++ b/python/sedona/spark/worker/serde.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyspark.serializers import write_int, SpecialLengths
+from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer
+
+from sedona.spark.worker.udf_info import UDFInfo
+
+
+class SedonaDBSerializer(ArrowStreamPandasSerializer):
+    def __init__(self, timezone, safecheck, db, udf_info: UDFInfo, 
cast_to_wkb=False):
+        super().__init__(timezone, safecheck)
+        self.db = db
+        self.udf_info = udf_info
+        self.cast_to_wkb = cast_to_wkb
+
+    def load_stream(self, stream):
+        import pyarrow as pa
+
+        batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
+        index = 0
+        for batch in batches:
+            table = pa.Table.from_batches(batches=[batch])
+            import pyarrow as pa
+
+            df = self.db.create_data_frame(table)
+            table_name = f"my_table_{index}"
+
+            df.to_view(table_name)
+
+            sql_expression = self.udf_info.sedona_db_transformation_expr(
+                table_name, self.cast_to_wkb
+            )
+
+            index += 1
+
+            yield self.db.sql(sql_expression)
+
+    def arrow_dump_stream(self, iterator, stream):
+        import pyarrow as pa
+
+        writer = None
+        try:
+            for batch in iterator:
+                if writer is None:
+                    writer = pa.RecordBatchStreamWriter(stream, batch.schema)
+                writer.write_batch(batch)
+        finally:
+            if writer is not None:
+                writer.close()
+
+    def dump_stream(self, iterator, stream):
+        def init_stream_yield_batches():
+            should_write_start_length = True
+            for batch in iterator:
+                if should_write_start_length:
+                    write_int(SpecialLengths.START_ARROW_STREAM, stream)
+                    should_write_start_length = False
+
+                yield batch
+
+        return self.arrow_dump_stream(init_stream_yield_batches(), stream)
diff --git a/python/sedona/spark/worker/udf_info.py 
b/python/sedona/spark/worker/udf_info.py
new file mode 100644
index 0000000000..32a0833f51
--- /dev/null
+++ b/python/sedona/spark/worker/udf_info.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass
+
+from sedona.spark import GeometryType
+
+
+@dataclass
+class UDFInfo:
+    arg_offsets: list
+    geom_offsets: dict
+    function: object
+    return_type: object
+    name: str
+
+    def get_function_call_sql(self, table_name: str, cast_to_wkb: bool = 
False) -> str:
+        arg_offset_str = ", ".join([f"_{el}" for el in self.arg_offsets])
+        function_expr = f"{self.name}({arg_offset_str})"
+        if isinstance(self.return_type, GeometryType) and cast_to_wkb:
+            return f"SELECT ST_GeomToSedonaSpark({function_expr}) AS _0 FROM 
{table_name}"  # nosec
+
+        return f"SELECT {function_expr} AS _0 FROM {table_name}"  # nosec
+
+    def sedona_db_transformation_expr(
+        self, table_name: str, cast_to_wkb: bool = False
+    ) -> str:
+        fields = []
+        for arg in self.arg_offsets:
+            if arg in self.geom_offsets and cast_to_wkb:
+                crs = self.geom_offsets[arg]
+                fields.append(
+                    f"ST_GeomFromSedonaSpark(_{arg}, 'EPSG:{crs}') AS _{arg}"
+                )  # nosec
+                continue
+
+            fields.append(f"_{arg}")
+
+        fields_expr = ", ".join(fields)
+        return f"SELECT {fields_expr} FROM {table_name}"  # nosec
diff --git a/python/sedona/spark/worker/worker.py 
b/python/sedona/spark/worker/worker.py
new file mode 100644
index 0000000000..e31d8e76d1
--- /dev/null
+++ b/python/sedona/spark/worker/worker.py
@@ -0,0 +1,304 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import time
+
+import sedonadb
+from pyspark import TaskContext, shuffle, SparkFiles
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.resource import ResourceInformation
+from pyspark.serializers import (
+    read_int,
+    UTF8Deserializer,
+    read_bool,
+    read_long,
+    CPickleSerializer,
+    write_int,
+    write_long,
+    SpecialLengths,
+)
+
+from sedona.spark.worker.serde import SedonaDBSerializer
+from sedona.spark.worker.udf_info import UDFInfo
+
+
+def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False):
+    i = 0
+    for df in iterator:
+        i += 1
+        table_name = f"output_table_{i}"
+        df.to_view(table_name)
+
+        function_call_sql = udf_info.get_function_call_sql(
+            table_name, cast_to_wkb=cast_to_wkb
+        )
+
+        df_out = db.sql(function_call_sql)
+
+        df_out.to_view(f"view_{i}")
+        at = df_out.to_arrow_table()
+        batches = at.combine_chunks().to_batches()
+
+        yield from batches
+
+
+def check_python_version(utf_serde: UTF8Deserializer, infile) -> str:
+    version = utf_serde.loads(infile)
+
+    python_major, python_minor = sys.version_info[:2]
+
+    if version != f"{python_major}.{python_minor}":
+        raise PySparkRuntimeError(
+            error_class="PYTHON_VERSION_MISMATCH",
+            message_parameters={
+                "worker_version": str(sys.version_info[:2]),
+                "driver_version": str(version),
+            },
+        )
+
+    return version
+
+
+def check_barrier_flag(infile):
+    is_barrier = read_bool(infile)
+    bound_port = read_int(infile)
+    secret = UTF8Deserializer().loads(infile)
+
+    if is_barrier:
+        raise PySparkRuntimeError(
+            error_class="BARRIER_MODE_NOT_SUPPORTED",
+            message_parameters={
+                "worker_version": str(sys.version_info[:2]),
+                "message": "Barrier mode is not supported by SedonaDB 
vectorized functions.",
+            },
+        )
+
+    return is_barrier
+
+
+def assign_task_context(utf_serde: UTF8Deserializer, infile):
+    stage_id = read_int(infile)
+    partition_id = read_int(infile)
+    attempt_number = read_long(infile)
+    task_attempt_id = read_int(infile)
+    cpus = read_int(infile)
+
+    task_context = TaskContext._getOrCreate()
+    task_context._stage_id = stage_id
+    task_context._partition_id = partition_id
+    task_context._attempt_number = attempt_number
+    task_context._task_attempt_id = task_attempt_id
+    task_context._cpus = cpus
+
+    for r in range(read_int(infile)):
+        key = utf_serde.loads(infile)
+        name = utf_serde.loads(infile)
+        addresses = []
+        task_context._resources = {}
+        for a in range(read_int(infile)):
+            addresses.append(utf_serde.loads(infile))
+        task_context._resources[key] = ResourceInformation(name, addresses)
+
+    task_context._localProperties = {}
+    for i in range(read_int(infile)):
+        k = utf_serde.loads(infile)
+        v = utf_serde.loads(infile)
+        task_context._localProperties[k] = v
+
+    return task_context
+
+
+def resolve_python_path(utf_serde: UTF8Deserializer, infile):
+    def add_path(path: str):
+        # worker can be used, so do not add path multiple times
+        if path not in sys.path:
+            # overwrite system packages
+            sys.path.insert(1, path)
+
+    spark_files_dir = utf_serde.loads(infile)
+
+    SparkFiles._root_directory = spark_files_dir
+    SparkFiles._is_running_on_worker = True
+
+    add_path(spark_files_dir)  # *.py files that were added will be copied here
+    num_python_includes = read_int(infile)
+    for _ in range(num_python_includes):
+        filename = utf_serde.loads(infile)
+        add_path(os.path.join(spark_files_dir, filename))
+
+    importlib.invalidate_caches()
+
+
+def check_broadcast_variables(infile):
+    needs_broadcast_decryption_server = read_bool(infile)
+    num_broadcast_variables = read_int(infile)
+
+    if needs_broadcast_decryption_server or num_broadcast_variables > 0:
+        raise PySparkRuntimeError(
+            error_class="BROADCAST_VARS_NOT_SUPPORTED",
+            message_parameters={
+                "worker_version": str(sys.version_info[:2]),
+                "message": "Broadcast variables are not supported by SedonaDB 
vectorized functions.",
+            },
+        )
+
+
+def get_runner_conf(utf_serde: UTF8Deserializer, infile):
+    runner_conf = {}
+    num_conf = read_int(infile)
+    for i in range(num_conf):
+        k = utf_serde.loads(infile)
+        v = utf_serde.loads(infile)
+        runner_conf[k] = v
+    return runner_conf
+
+
+def read_command(serializer, infile):
+    command = serializer._read_with_length(infile)
+    return command
+
+
+def read_udf(infile, pickle_ser) -> UDFInfo:
+    num_arg = read_int(infile)
+    arg_offsets = [read_int(infile) for i in range(num_arg)]
+
+    function = None
+    return_type = None
+
+    for i in range(read_int(infile)):
+        function, return_type = read_command(pickle_ser, infile)
+
+    sedona_db_udf_expression = function()
+
+    return UDFInfo(
+        arg_offsets=arg_offsets,
+        function=sedona_db_udf_expression,
+        return_type=return_type,
+        name=sedona_db_udf_expression._name,
+        geom_offsets=[0],
+    )
+
+
+def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo:
+    num_udfs = read_int(infile)
+
+    udf = None
+    for _ in range(num_udfs):
+        udf = read_udf(infile, pickle_ser)
+
+    return udf
+
+
+def report_times(outfile, boot, init, finish):
+    write_int(SpecialLengths.TIMING_DATA, outfile)
+    write_long(int(1000 * boot), outfile)
+    write_long(int(1000 * init), outfile)
+    write_long(int(1000 * finish), outfile)
+
+
+def write_statistics(infile, outfile, boot_time, init_time) -> None:
+    TaskContext._setTaskContext(None)
+    finish_time = time.time()
+    report_times(outfile, boot_time, init_time, finish_time)
+    write_long(shuffle.MemoryBytesSpilled, outfile)
+    write_long(shuffle.DiskBytesSpilled, outfile)
+
+    write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
+
+    if read_int(infile) == SpecialLengths.END_OF_STREAM:
+        write_int(SpecialLengths.END_OF_STREAM, outfile)
+        outfile.flush()
+    else:
+        write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
+        outfile.flush()
+        sys.exit(-1)
+
+
+def main(infile, outfile):
+    boot_time = time.time()
+    sedona_db = sedonadb.connect()
+    #
+    utf8_deserializer = UTF8Deserializer()
+    pickle_ser = CPickleSerializer()
+
+    split_index = read_int(infile)
+
+    check_python_version(utf8_deserializer, infile)
+
+    check_barrier_flag(infile)
+
+    task_context = assign_task_context(utf_serde=utf8_deserializer, 
infile=infile)
+    shuffle.MemoryBytesSpilled = 0
+    shuffle.DiskBytesSpilled = 0
+
+    resolve_python_path(utf8_deserializer, infile)
+
+    check_broadcast_variables(infile)
+
+    eval_type = read_int(infile)
+
+    runner_conf = get_runner_conf(utf8_deserializer, infile)
+
+    udf = register_sedona_db_udf(infile, pickle_ser)
+
+    sedona_db.register_udf(udf.function)
+    init_time = time.time()
+
+    cast_to_wkb = read_bool(infile)
+
+    serde = SedonaDBSerializer(
+        timezone=runner_conf.get("spark.sql.session.timeZone", "UTC"),
+        safecheck=False,
+        db=sedona_db,
+        udf_info=udf,
+        cast_to_wkb=cast_to_wkb,
+    )
+
+    number_of_geometries = read_int(infile)
+    geom_offsets = {}
+    for i in range(number_of_geometries):
+        geom_index = read_int(infile)
+        geom_srid = read_int(infile)
+
+        geom_offsets[geom_index] = geom_srid
+
+    udf.geom_offsets = geom_offsets
+
+    iterator = serde.load_stream(infile)
+    out_iterator = apply_iterator(
+        db=sedona_db, iterator=iterator, udf_info=udf, cast_to_wkb=cast_to_wkb
+    )
+
+    serde.dump_stream(out_iterator, outfile)
+
+    write_statistics(infile, outfile, boot_time=boot_time, init_time=init_time)
+
+
+if __name__ == "__main__":
+    # add file handler
+    auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
+    java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
+    (sock_file, sc) = local_connect_and_auth(java_port, auth_secret)
+
+    write_int(os.getpid(), sock_file)
+    sock_file.flush()
+
+    main(sock_file, sock_file)
diff --git a/python/setup.py b/python/setup.py
new file mode 100644
index 0000000000..ae5e7bf174
--- /dev/null
+++ b/python/setup.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from setuptools import setup
+import numpy
+
+setup(
+    include_dirs=[numpy.get_include()],
+)
diff --git a/python/src/geomserde_speedup_module.c 
b/python/src/geomserde_speedup_module.c
index a95ced29e5..b84642488e 100644
--- a/python/src/geomserde_speedup_module.c
+++ b/python/src/geomserde_speedup_module.c
@@ -19,6 +19,9 @@
 
 #define PY_SSIZE_T_CLEAN
 #include <Python.h>
+#include <numpy/ndarraytypes.h>
+#include <numpy/npy_3kcompat.h>
+#include <numpy/ufuncobject.h>
 #include <stdio.h>
 
 #include "geomserde.h"
@@ -262,7 +265,110 @@ static PyObject *deserialize_1(PyObject *self, PyObject 
*args) {
   return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length);
 }
 
+static PyObject *to_sedona_func(PyObject *self, PyObject *args) {
+  import_array();
+  PyObject *input_obj = NULL;
+  if (!PyArg_ParseTuple(args, "O", &input_obj)) {
+    return NULL;
+  };
+
+  PyArrayObject *array = (PyArrayObject *)input_obj;
+  PyObject **objs = (PyObject **)PyArray_DATA(array);
+
+  GEOSContextHandle_t handle = get_geos_context_handle();
+  if (handle == NULL) {
+    return NULL;
+  }
+
+  npy_intp n = PyArray_SIZE(array);
+  npy_intp dims[1] = {n};
+  PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT);
+  for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
+    PyObject *obj = objs[i];
+    GEOSGeometry *geos_geom = NULL;
+    char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom);
+    if (!success || geos_geom == NULL) {
+      PyErr_SetString(PyExc_TypeError, "Invalid GEOS geometry");
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    PyObject *serialized = do_serialize(geos_geom);
+    if (!serialized) {
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized) < 0) {
+      Py_DECREF(serialized);
+      Py_DECREF(out);
+      return NULL;
+    }
+    Py_DECREF(serialized);
+  }
+
+  return (PyObject *)out;
+}
 /* Module definition for Shapely 2.x */
+static PyObject *from_sedona_func(PyObject *self, PyObject *args) {
+  import_array();
+  PyObject *input_obj = NULL;
+  if (!PyArg_ParseTuple(args, "O", &input_obj)) {
+    return NULL;
+  };
+
+  GEOSContextHandle_t handle = get_geos_context_handle();
+
+  PyArrayObject *array = (PyArrayObject *)input_obj;
+  PyObject **objs = (PyObject **)PyArray_DATA(array);
+
+  int p_bytes_read = 0;
+
+  npy_intp n = PyArray_SIZE(array);
+
+  npy_intp dims[1] = {n};
+  PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT);
+
+  for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
+    PyObject *obj = objs[i];
+    if (!PyBytes_Check(obj)) {
+      PyErr_SetString(PyExc_TypeError, "Expected bytes");
+      Py_DECREF(out);
+
+      return NULL;
+    }
+
+    char *buf = PyBytes_AS_STRING(obj);
+
+    Py_ssize_t len = PyBytes_GET_SIZE(obj);
+
+    GEOSGeometry *geom = NULL;
+
+    SedonaErrorCode err =
+        sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read);
+    if (err != SEDONA_SUCCESS) {
+      handle_geomserde_error(err);
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle);
+    if (!pygeom) {
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom) < 0) {
+      Py_DECREF(pygeom);
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    Py_DECREF(pygeom);
+  }
+
+  return (PyObject *)out;
+}
 
 static PyMethodDef geomserde_methods_shapely_2[] = {
     {"load_libgeos_c", load_libgeos_c, METH_VARARGS, "Load libgeos_c."},
@@ -270,6 +376,10 @@ static PyMethodDef geomserde_methods_shapely_2[] = {
      "Serialize geometry object as bytearray."},
     {"deserialize", deserialize, METH_VARARGS,
      "Deserialize bytes-like object to geometry object."},
+    {"from_sedona_func", from_sedona_func, METH_VARARGS,
+     "Deserialize bytes-like object to geometry object."},
+    {"to_sedona_func", to_sedona_func, METH_VARARGS,
+     "Deserialize bytes-like object to geometry object."},
     {NULL, NULL, 0, NULL}, /* Sentinel */
 };
 
diff --git a/python/tests/test_base.py b/python/tests/test_base.py
index cc2b09e422..3974930207 100644
--- a/python/tests/test_base.py
+++ b/python/tests/test_base.py
@@ -22,7 +22,7 @@ from typing import Iterable, Union
 import pyspark
 from pyspark.sql import DataFrame
 
-from sedona.spark import *
+from sedona.spark import SedonaContext
 from sedona.spark.utils.decorators import classproperty
 
 SPARK_REMOTE = os.getenv("SPARK_REMOTE")
@@ -70,7 +70,15 @@ class TestBase:
                     "spark.sedona.stac.load.itemsLimitMax",
                     "20",
                 )
-                # Pandas on PySpark doesn't work with ANSI mode, which is 
enabled by default
+                .config("spark.executor.memory", "10G")
+                .config("spark.driver.memory", "10G")
+                .config(
+                    "sedona.python.worker.udf.daemon.module",
+                    "sedona.spark.worker.daemon",
+                )
+                .config(
+                    "sedona.python.worker.daemon.enabled", "true"
+                )  # Pandas on PySpark doesn't work with ANSI mode, which is 
enabled by default
                 # in Spark 4
                 .config("spark.sql.ansi.enabled", "false")
             )
diff --git a/python/tests/utils/test_sedona_db_vectorized_udf.py 
b/python/tests/utils/test_sedona_db_vectorized_udf.py
new file mode 100644
index 0000000000..4809c7faa4
--- /dev/null
+++ b/python/tests/utils/test_sedona_db_vectorized_udf.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numpy as np
+import pyspark
+import pytest
+
+from sedona.spark.sql.functions import sedona_db_vectorized_udf
+from sedona.spark.utils.geometry_serde import to_sedona, from_sedona
+from tests.test_base import TestBase
+import pyarrow as pa
+import shapely
+from sedona.sql import GeometryType
+from pyspark.sql.functions import expr, lit
+from pyspark.sql.types import DoubleType, IntegerType, ByteType
+from sedona.spark.sql import ST_X
+
+
+class TestSedonaDBArrowFunction(TestBase):
+
+    @pytest.mark.vectorized
+    @pytest.mark.skipif(
+        pyspark.__version__ >= "4.0.0" or pyspark.__version__ < "3.5.0",
+        reason="Skip for pyspark > 4.0",
+    )
+    def test_vectorized_udf(self):
+        @sedona_db_vectorized_udf(
+            return_type=GeometryType(), input_types=[ByteType(), IntegerType()]
+        )
+        def my_own_function(geom, distance):
+            geom_wkb = pa.array(geom.storage.to_array())
+            geometry_array = np.asarray(geom_wkb, dtype=object)
+            distance = pa.array(distance.to_array())
+            geom = from_sedona(geometry_array)
+            result_shapely = shapely.centroid(geom)
+
+            return pa.array(to_sedona(result_shapely))
+
+        df = self.spark.createDataFrame(
+            [
+                (1, "POINT (1 1)"),
+                (2, "POINT (2 2)"),
+                (3, "POINT (3 3)"),
+            ],
+            ["id", "wkt"],
+        ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)"))
+
+        df.select(ST_X(my_own_function(df.wkt, lit(100)).alias("geom"))).show()
+
+    @pytest.mark.vectorized
+    @pytest.mark.skipif(
+        pyspark.__version__ >= "4.0.0" or pyspark.__version__ < "3.5.0",
+        reason="Skip for pyspark > 4.0",
+    )
+    def test_geometry_to_double(self):
+        @sedona_db_vectorized_udf(return_type=DoubleType(), 
input_types=[ByteType()])
+        def geometry_to_non_geometry_udf(geom):
+            geom_wkb = pa.array(geom.storage.to_array())
+            geometry_array = np.asarray(geom_wkb, dtype=object)
+            geom = from_sedona(geometry_array)
+
+            result_shapely = shapely.get_x(shapely.centroid(geom))
+
+            return pa.array(result_shapely)
+
+        df = self.spark.createDataFrame(
+            [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")],
+            ["id", "wkt"],
+        ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)"))
+
+        values = df.select(
+            geometry_to_non_geometry_udf(df.wkt).alias("x_coord")
+        ).collect()
+
+        values_list = [row["x_coord"] for row in values]
+
+        assert values_list == [1.0, 2.0, 3.0]
+
+    @pytest.mark.vectorized
+    @pytest.mark.skipif(
+        pyspark.__version__ >= "4.0.0" or pyspark.__version__ < "3.5.0",
+        reason="Skip for pyspark > 4.0",
+    )
+    def test_geometry_to_int(self):
+        @sedona_db_vectorized_udf(return_type=IntegerType(), 
input_types=[ByteType()])
+        def geometry_to_int(geom):
+            geom_wkb = pa.array(geom.storage.to_array())
+            geometry_array = np.asarray(geom_wkb, dtype=object)
+
+            geom = from_sedona(geometry_array)
+
+            result_shapely = shapely.get_num_points(geom)
+
+            return pa.array(result_shapely)
+
+        df = self.spark.createDataFrame(
+            [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")],
+            ["id", "wkt"],
+        ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)"))
+
+        values = df.select(geometry_to_int(df.wkt)).collect()
+
+        values_list = [row[0] for row in values]
+
+        assert values_list == [0, 0, 0]
+
+    @pytest.mark.vectorized
+    @pytest.mark.skipif(
+        pyspark.__version__ >= "4.0.0" or pyspark.__version__ < "3.5.0",
+        reason="Skip for pyspark > 4.0",
+    )
+    def test_geometry_crs_preservation(self):
+        @sedona_db_vectorized_udf(return_type=GeometryType(), 
input_types=[ByteType()])
+        def return_same_geometry(geom):
+            geom_wkb = pa.array(geom.storage.to_array())
+            geometry_array = np.asarray(geom_wkb, dtype=object)
+
+            geom = from_sedona(geometry_array)
+
+            return pa.array(to_sedona(geom))
+
+        df = self.spark.createDataFrame(
+            [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")],
+            ["id", "wkt"],
+        ).withColumn("wkt", expr("ST_SetSRID(ST_GeomFromWKT(wkt), 3857)"))
+
+        result_df = df.select(return_same_geometry(df.wkt).alias("geom"))
+
+        crs_list = (
+            result_df.selectExpr("ST_SRID(geom)").rdd.flatMap(lambda x: 
x).collect()
+        )
+
+        assert crs_list == [3857, 3857, 3857]
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala 
b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
index b0e46cf6e9..c9e8497f7e 100644
--- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
@@ -72,7 +72,7 @@ object SedonaContext {
 
     val sedonaArrowStrategy = Try(
       Class
-        .forName("org.apache.spark.sql.udf.SedonaArrowStrategy")
+        .forName("org.apache.spark.sql.execution.python.SedonaArrowStrategy")
         .getDeclaredConstructor()
         .newInstance()
         .asInstanceOf[SparkStrategy])
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala 
b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
index aece26267d..98b24e256e 100644
--- a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
@@ -23,7 +23,16 @@ object PythonEvalType {
   val SQL_SCALAR_SEDONA_UDF = 5200
   val SEDONA_UDF_TYPE_CONSTANT = 5000
 
+  // sedona db eval types
+  val SQL_SCALAR_SEDONA_DB_UDF = 6200
+  val SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF = 6201
+  val SEDONA_DB_UDF_TYPE_CONSTANT = 6000
+
   def toString(pythonEvalType: Int): String = pythonEvalType match {
     case SQL_SCALAR_SEDONA_UDF => "SQL_SCALAR_GEO_UDF"
+    case SQL_SCALAR_SEDONA_DB_UDF => "SQL_SCALAR_SEDONA_DB_UDF"
   }
+
+  def evals(): Set[Int] =
+    Set(SQL_SCALAR_SEDONA_UDF, SQL_SCALAR_SEDONA_DB_UDF, 
SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF)
 }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
new file mode 100644
index 0000000000..3055e768b9
--- /dev/null
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * Similar to `PythonUDFRunner`, but exchange data with Python worker via 
Arrow stream.
+ */
+class SedonaArrowPythonRunner(
+    funcs: Seq[ChainedPythonFunctions],
+    evalType: Int,
+    argOffsets: Array[Array[Int]],
+    protected override val schema: StructType,
+    protected override val timeZoneId: String,
+    protected override val largeVarTypes: Boolean,
+    protected override val workerConf: Map[String, String],
+    val pythonMetrics: Map[String, SQLMetric],
+    jobArtifactUUID: Option[String],
+    geometryFields: Seq[(Int, Int)],
+    castGeometryToWKB: Boolean = false)
+    extends SedonaBasePythonRunner[Iterator[InternalRow], ColumnarBatch](
+      funcs,
+      evalType,
+      argOffsets,
+      jobArtifactUUID,
+      geometryFields,
+      castGeometryToWKB)
+    with SedonaBasicPythonArrowInput
+    with SedonaBasicPythonArrowOutput {
+
+  override val errorOnDuplicatedFieldNames: Boolean = true
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+    bufferSize >= 4,
+    "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+      s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
new file mode 100644
index 0000000000..9deb572edf
--- /dev/null
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.sedona.sql.UDF.PythonEvalType
+import 
org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF, 
SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF}
+import org.apache.spark.api.python.ChainedPythonFunctions
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow, PythonUDF}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.udf.SedonaArrowEvalPython
+import org.apache.spark.{JobArtifactSet, TaskContext}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+// We use custom Strategy to avoid Apache Spark assert on types, we
+// can consider extending this to support other engines working with
+// arrow data
+class SedonaArrowStrategy extends Strategy {
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case SedonaArrowEvalPython(udfs, output, child, evalType) =>
+      SedonaArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: 
Nil
+    case _ => Nil
+  }
+}
+
+// It's modification og Apache Spark's ArrowEvalPythonExec, we remove the 
check on the types to allow geometry types
+// here, it's initial version to allow the vectorized udf for Sedona geometry 
types. We can consider extending this
+// to support other engines working with arrow data
+case class SedonaArrowEvalPythonExec(
+    udfs: Seq[PythonUDF],
+    resultAttrs: Seq[Attribute],
+    child: SparkPlan,
+    evalType: Int)
+    extends EvalPythonExec
+    with PythonSQLMetrics {
+
+  private val batchSize = conf.arrowMaxRecordsPerBatch
+  private val sessionLocalTimeZone = conf.sessionLocalTimeZone
+  private val largeVarTypes = conf.arrowUseLargeVarTypes
+  private val pythonRunnerConf =
+    Map[String, String](SQLConf.SESSION_LOCAL_TIMEZONE.key -> 
conf.sessionLocalTimeZone)
+  private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+
+  private def inferCRS(iterator: Iterator[InternalRow], schema: StructType): 
Seq[(Int, Int)] = {
+    // this triggers the iterator
+    if (!iterator.hasNext) {
+      return Seq.empty
+    }
+
+    val row = iterator.next()
+
+    val rowMatched = row match {
+      case generic: GenericInternalRow =>
+        Some(generic)
+      case _ => None
+    }
+
+    schema
+      .filter { field =>
+        field.dataType == GeometryUDT
+      }
+      .zipWithIndex
+      .map { case (_, index) =>
+        if (rowMatched.isEmpty || rowMatched.get.values(index) == null) 
(index, 0)
+        else {
+          val geom = rowMatched.get.get(index, 
GeometryUDT).asInstanceOf[Array[Byte]]
+          val preambleByte = geom(0) & 0xff
+          val hasSrid = (preambleByte & 0x01) != 0
+
+          var srid = 0
+          if (hasSrid) {
+            val srid2 = (geom(1) & 0xff) << 16
+            val srid1 = (geom(2) & 0xff) << 8
+            val srid0 = geom(3) & 0xff
+            srid = srid2 | srid1 | srid0
+          }
+
+          (index, srid)
+        }
+      }
+  }
+
+  protected override def evaluate(
+      funcs: Seq[ChainedPythonFunctions],
+      argOffsets: Array[Array[Int]],
+      iter: Iterator[InternalRow],
+      schema: StructType,
+      context: TaskContext): Iterator[InternalRow] = {
+    val (probe, full) = iter.duplicate
+
+    val geometryFields = inferCRS(probe, schema)
+
+    val batchIter = if (batchSize > 0) new BatchIterator(full, batchSize) else 
Iterator(full)
+
+    evalType match {
+      case SQL_SCALAR_SEDONA_DB_UDF | SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF =>
+        val columnarBatchIter = new SedonaArrowPythonRunner(
+          funcs,
+          200,
+          argOffsets,
+          schema,
+          sessionLocalTimeZone,
+          largeVarTypes,
+          pythonRunnerConf,
+          pythonMetrics,
+          jobArtifactUUID,
+          geometryFields,
+          evalType == SQL_SCALAR_SEDONA_DB_UDF)
+          .compute(batchIter, context.partitionId(), context)
+
+        val result = columnarBatchIter.flatMap { batch =>
+          batch.rowIterator.asScala
+        }
+
+        result
+
+      case SQL_SCALAR_SEDONA_UDF =>
+        val columnarBatchIter = new ArrowPythonRunner(
+          funcs,
+          evalType - PythonEvalType.SEDONA_UDF_TYPE_CONSTANT,
+          argOffsets,
+          schema,
+          sessionLocalTimeZone,
+          largeVarTypes,
+          pythonRunnerConf,
+          pythonMetrics,
+          jobArtifactUUID).compute(batchIter, context.partitionId(), context)
+
+        val iter = columnarBatchIter.flatMap { batch =>
+          batch.rowIterator.asScala
+        }
+
+        iter
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
new file mode 100644
index 0000000000..055d5db15f
--- /dev/null
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import java.net._
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.JavaConverters._
+import org.apache.spark._
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.EXECUTOR_CORES
+import org.apache.spark.internal.config.Python._
+import 
org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY, 
PYSPARK_MEMORY_LOCAL_PROPERTY}
+import org.apache.spark.util._
+
+private object SedonaBasePythonRunner {
+
+  private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = 
"faulthandler")
+}
+
+private[spark] abstract class SedonaBasePythonRunner[IN, OUT](
+    funcs: Seq[ChainedPythonFunctions],
+    evalType: Int,
+    argOffsets: Array[Array[Int]],
+    jobArtifactUUID: Option[String],
+    val geometryFields: Seq[(Int, Int)] = Seq.empty,
+    val castGeometryToWKB: Boolean = false)
+    extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, 
jobArtifactUUID)
+    with Logging {
+
+  require(funcs.length == argOffsets.length, "argOffsets should have the same 
length as funcs")
+
+  private val conf = SparkEnv.get.conf
+  private val reuseWorker =
+    conf.getBoolean(PYTHON_WORKER_REUSE.key, 
PYTHON_WORKER_REUSE.defaultValue.get)
+  private val faultHandlerEnabled = 
conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
+
+  private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = 
{
+    mem.map(_ / cores)
+  }
+
+  import java.io._
+
+  override def compute(
+      inputIterator: Iterator[IN],
+      partitionIndex: Int,
+      context: TaskContext): Iterator[OUT] = {
+    val startTime = System.currentTimeMillis
+    val env = SparkEnv.get
+
+    val execCoresProp = 
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
+    val memoryMb = 
Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
+
+    if (simplifiedTraceback) {
+      envVars.put("SPARK_SIMPLIFIED_TRACEBACK", "1")
+    }
+    // SPARK-30299 this could be wrong with standalone mode when executor
+    // cores might not be correct because it defaults to all cores on the box.
+    val execCores = 
execCoresProp.map(_.toInt).getOrElse(conf.get(EXECUTOR_CORES))
+    val workerMemoryMb = getWorkerMemoryMb(memoryMb, execCores)
+    if (workerMemoryMb.isDefined) {
+      envVars.put("PYSPARK_EXECUTOR_MEMORY_MB", workerMemoryMb.get.toString)
+    }
+    envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+    envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+    if (faultHandlerEnabled) {
+      envVars.put("PYTHON_FAULTHANDLER_DIR", 
SedonaBasePythonRunner.faultHandlerLogDir.toString)
+    }
+
+    if (reuseWorker) {
+      envVars.put("SPARK_REUSE_WORKER", "1")
+    }
+
+    envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
+
+    val (worker: Socket, pid: Option[Int]) = {
+      WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap)
+    }
+
+    val releasedOrClosed = new AtomicBoolean(false)
+
+    // Start a thread to feed the process input from our parent's iterator
+    val writerThread = newWriterThread(env, worker, inputIterator, 
partitionIndex, context)
+
+    context.addTaskCompletionListener[Unit] { _ =>
+      writerThread.shutdownOnTaskCompletion()
+
+      if (!reuseWorker || releasedOrClosed.compareAndSet(false, true)) {
+        try {
+          worker.close()
+        } catch {
+          case e: Exception =>
+            logWarning("Failed to close worker socket", e)
+        }
+      }
+    }
+
+    writerThread.start()
+
+    val stream = new DataInputStream(new 
BufferedInputStream(worker.getInputStream, bufferSize))
+
+    val stdoutIterator = newReaderIterator(
+      stream,
+      writerThread,
+      startTime,
+      env,
+      worker,
+      pid,
+      releasedOrClosed,
+      context)
+    new InterruptibleIterator(context, stdoutIterator)
+  }
+}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
new file mode 100644
index 0000000000..459388856b
--- /dev/null
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.{SparkException, SparkFiles}
+import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.util.Utils
+
+import java.io.{DataInputStream, DataOutputStream, EOFException, File, 
InputStream}
+import java.net.{InetAddress, ServerSocket, Socket, SocketException}
+import java.util.Arrays
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import org.apache.spark._
+import org.apache.spark.errors.SparkCoreErrors
+import org.apache.spark.internal.Logging
+import org.apache.spark.security.SocketAuthHelper
+import 
org.apache.spark.sql.execution.python.SedonaPythonWorkerFactory.PROCESS_WAIT_TIMEOUT_MS
+import org.apache.spark.util.RedirectThread
+
+import javax.annotation.concurrent.GuardedBy
+
+class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) 
extends Logging {
+  self =>
+
+  private val simpleWorkers = new mutable.WeakHashMap[Socket, Process]()
+  private val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
+  @GuardedBy("self")
+  private var daemon: Process = null
+  val daemonHost = InetAddress.getLoopbackAddress()
+  @GuardedBy("self")
+  private var daemonPort: Int = 0
+  @GuardedBy("self")
+  private val daemonWorkers = new mutable.WeakHashMap[Socket, Int]()
+  @GuardedBy("self")
+  private val idleWorkers = new mutable.Queue[Socket]()
+  @GuardedBy("self")
+  private var lastActivityNs = 0L
+
+  private val useDaemon: Boolean =
+    SparkEnv.get.conf.getBoolean("sedona.python.worker.daemon.enabled", false)
+
+  private val sedonaUDFWorkerModule =
+    SparkEnv.get.conf.get("sedona.python.worker.udf.module", 
"sedona.spark.worker.worker")
+
+  private val sedonaDaemonModule =
+    SparkEnv.get.conf.get("sedona.python.worker.udf.daemon.module", 
"sedona.spark.worker.daemon")
+
+  private val pythonPath = PythonUtils.mergePythonPaths(
+    PythonUtils.sparkPythonPath,
+    envVars.getOrElse("PYTHONPATH", ""),
+    sys.env.getOrElse("PYTHONPATH", ""))
+
+  def create(): (Socket, Option[Int]) = {
+    if (useDaemon) {
+      self.synchronized {
+        if (idleWorkers.nonEmpty) {
+          val worker = idleWorkers.dequeue()
+          return (worker, daemonWorkers.get(worker))
+        }
+      }
+
+      createThroughDaemon()
+    } else {
+      createSimpleWorker(sedonaUDFWorkerModule)
+    }
+  }
+
+  private def createSimpleWorker(workerModule: String): (Socket, Option[Int]) 
= {
+    var serverSocket: ServerSocket = null
+    try {
+      serverSocket = new ServerSocket(0, 1, InetAddress.getLoopbackAddress())
+
+      // Create and start the worker
+      val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", 
workerModule))
+      val jobArtifactUUID = envVars.getOrElse("SPARK_JOB_ARTIFACT_UUID", 
"default")
+      if (jobArtifactUUID != "default") {
+        val f = new File(SparkFiles.getRootDirectory(), jobArtifactUUID)
+        f.mkdir()
+        pb.directory(f)
+      }
+      val workerEnv = pb.environment()
+      workerEnv.putAll(envVars.asJava)
+      workerEnv.put("PYTHONPATH", pythonPath)
+      // This is equivalent to setting the -u flag; we use it because ipython 
doesn't support -u:
+      workerEnv.put("PYTHONUNBUFFERED", "YES")
+      workerEnv.put("PYTHON_WORKER_FACTORY_PORT", 
serverSocket.getLocalPort.toString)
+      workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
+      if (Utils.preferIPv6) {
+        workerEnv.put("SPARK_PREFER_IPV6", "True")
+      }
+      val worker = pb.start()
+
+      // Redirect worker stdout and stderr
+      redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
+
+      // Wait for it to connect to our socket, and validate the auth secret.
+      serverSocket.setSoTimeout(10000)
+
+      try {
+        val socket = serverSocket.accept()
+        authHelper.authClient(socket)
+        // TODO: When we drop JDK 8, we can just use worker.pid()
+        val pid = new DataInputStream(socket.getInputStream).readInt()
+        if (pid < 0) {
+          throw new IllegalStateException("Python failed to launch worker with 
code " + pid)
+        }
+        self.synchronized {
+          simpleWorkers.put(socket, worker)
+        }
+
+        (socket, Some(pid))
+      } catch {
+        case e: Exception =>
+          throw new SparkException("Python worker failed to connect back.", e)
+      }
+    } finally {
+      if (serverSocket != null) {
+        serverSocket.close()
+      }
+    }
+  }
+
+  private def redirectStreamsToStderr(stdout: InputStream, stderr: 
InputStream): Unit = {
+    try {
+      new RedirectThread(stdout, System.err, "stdout reader for " + 
pythonExec).start()
+      new RedirectThread(stderr, System.err, "stderr reader for " + 
pythonExec).start()
+    } catch {
+      case e: Exception =>
+        logError("Exception in redirecting streams", e)
+    }
+  }
+
+  private def createThroughDaemon(): (Socket, Option[Int]) = {
+
+    def createSocket(): (Socket, Option[Int]) = {
+      val socket = new Socket(daemonHost, daemonPort)
+      val pid = new DataInputStream(socket.getInputStream).readInt()
+      if (pid < 0) {
+        throw new IllegalStateException("Python daemon failed to launch worker 
with code " + pid)
+      }
+
+      authHelper.authToServer(socket)
+      daemonWorkers.put(socket, pid)
+      (socket, Some(pid))
+    }
+
+    self.synchronized {
+      // Start the daemon if it hasn't been started
+      startDaemon()
+
+      // Attempt to connect, restart and retry once if it fails
+      try {
+        createSocket()
+      } catch {
+        case exc: SocketException =>
+          logWarning("Failed to open socket to Python daemon:", exc)
+          logWarning("Assuming that daemon unexpectedly quit, attempting to 
restart")
+          stopDaemon()
+          startDaemon()
+          createSocket()
+      }
+    }
+  }
+
+  private def stopDaemon(): Unit = {
+    self.synchronized {
+      if (useDaemon) {
+        cleanupIdleWorkers()
+
+        // Request shutdown of existing daemon by sending SIGTERM
+        if (daemon != null) {
+          daemon.destroy()
+        }
+
+        daemon = null
+        daemonPort = 0
+      } else {
+        simpleWorkers.mapValues(_.destroy())
+      }
+    }
+  }
+
+  private def startDaemon(): Unit = {
+    self.synchronized {
+      // Is it already running?
+      if (daemon != null) {
+        return
+      }
+
+      try {
+        // Create and start the daemon
+        val command = Arrays.asList(pythonExec, "-m", sedonaDaemonModule)
+        val pb = new ProcessBuilder(command)
+        val jobArtifactUUID = envVars.getOrElse("SPARK_JOB_ARTIFACT_UUID", 
"default")
+        if (jobArtifactUUID != "default") {
+          val f = new File(SparkFiles.getRootDirectory(), jobArtifactUUID)
+          f.mkdir()
+          pb.directory(f)
+        }
+        val workerEnv = pb.environment()
+        workerEnv.putAll(envVars.asJava)
+        workerEnv.put("PYTHONPATH", pythonPath)
+        workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
+        if (Utils.preferIPv6) {
+          workerEnv.put("SPARK_PREFER_IPV6", "True")
+        }
+        // This is equivalent to setting the -u flag; we use it because 
ipython doesn't support -u:
+        workerEnv.put("PYTHONUNBUFFERED", "YES")
+        daemon = pb.start()
+
+        val in = new DataInputStream(daemon.getInputStream)
+        try {
+          daemonPort = in.readInt()
+        } catch {
+          case _: EOFException if daemon.isAlive =>
+            throw 
SparkCoreErrors.eofExceptionWhileReadPortNumberError(sedonaDaemonModule)
+          case _: EOFException =>
+            throw SparkCoreErrors.eofExceptionWhileReadPortNumberError(
+              sedonaDaemonModule,
+              Some(daemon.exitValue))
+        }
+
+        // test that the returned port number is within a valid range.
+        // note: this does not cover the case where the port number
+        // is arbitrary data but is also coincidentally within range
+        if (daemonPort < 1 || daemonPort > 0xffff) {
+          val exceptionMessage = f"""
+                                    |Bad data in $sedonaDaemonModule's 
standard output. Invalid port number:
+                                    |  $daemonPort (0x$daemonPort%08x)
+                                    |Python command to execute the daemon was:
+                                    |  ${command.asScala.mkString(" ")}
+                                    |Check that you don't have any unexpected 
modules or libraries in
+                                    |your PYTHONPATH:
+                                    |  $pythonPath
+                                    |Also, check if you have a 
sitecustomize.py module in your python path,
+                                    |or in your python installation, that is 
printing to standard output"""
+          throw new SparkException(exceptionMessage.stripMargin)
+        }
+
+        // Redirect daemon stdout and stderr
+        redirectStreamsToStderr(in, daemon.getErrorStream)
+      } catch {
+        case e: Exception =>
+          // If the daemon exists, wait for it to finish and get its stderr
+          val stderr = Option(daemon)
+            .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) }
+            .getOrElse("")
+
+          stopDaemon()
+
+          if (stderr != "") {
+            val formattedStderr = stderr.replace("\n", "\n  ")
+            val errorMessage = s"""
+                                  |Error from python worker:
+                                  |  $formattedStderr
+                                  |PYTHONPATH was:
+                                  |  $pythonPath
+                                  |$e"""
+
+            // Append error message from python daemon, but keep original 
stack trace
+            val wrappedException = new SparkException(errorMessage.stripMargin)
+            wrappedException.setStackTrace(e.getStackTrace)
+            throw wrappedException
+          } else {
+            throw e
+          }
+      }
+
+      // Important: don't close daemon's stdin (daemon.getOutputStream) so it 
can correctly
+      // detect our disappearance.
+    }
+  }
+
+  private def cleanupIdleWorkers(): Unit = {
+    while (idleWorkers.nonEmpty) {
+      val worker = idleWorkers.dequeue()
+      try {
+        // the worker will exit after closing the socket
+        worker.close()
+      } catch {
+        case e: Exception =>
+          logWarning("Failed to close worker socket", e)
+      }
+    }
+  }
+
+  def releaseWorker(worker: Socket): Unit = {
+    if (useDaemon) {
+      self.synchronized {
+        lastActivityNs = System.nanoTime()
+        idleWorkers.enqueue(worker)
+      }
+    } else {
+      // Cleanup the worker socket. This will also cause the Python worker to 
exit.
+      try {
+        worker.close()
+      } catch {
+        case e: Exception =>
+          logWarning("Failed to close worker socket", e)
+      }
+    }
+  }
+
+  def stopWorker(worker: Socket): Unit = {
+    self.synchronized {
+      if (useDaemon) {
+        if (daemon != null) {
+          daemonWorkers.get(worker).foreach { pid =>
+            // tell daemon to kill worker by pid
+            val output = new DataOutputStream(daemon.getOutputStream)
+            output.writeInt(pid)
+            output.flush()
+            daemon.getOutputStream.flush()
+          }
+        }
+      } else {
+        simpleWorkers.get(worker).foreach(_.destroy())
+      }
+    }
+    worker.close()
+  }
+}
+
+private object SedonaPythonWorkerFactory {
+  val PROCESS_WAIT_TIMEOUT_MS = 10000
+}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
new file mode 100644
index 0000000000..6602967351
--- /dev/null
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.ArrowWriter
+import org.apache.spark.sql.util.ArrowUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, TaskContext}
+
+import java.io.DataOutputStream
+import java.net.Socket
+
+private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] {
+  self: SedonaBasePythonRunner[IN, _] =>
+  protected override def newWriterThread(
+      env: SparkEnv,
+      worker: Socket,
+      inputIterator: Iterator[IN],
+      partitionIndex: Int,
+      context: TaskContext): WriterThread = {
+    new WriterThread(env, worker, inputIterator, partitionIndex, context) {
+
+      protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+        handleMetadataBeforeExec(dataOut)
+        writeUDF(dataOut, funcs, argOffsets)
+
+        // if speedup is not available and we need to use casting
+        dataOut.writeBoolean(self.castGeometryToWKB)
+
+        // write
+        dataOut.writeInt(self.geometryFields.length)
+        // write geometry field indices and their SRIDs
+        geometryFields.foreach { case (index, srid) =>
+          dataOut.writeInt(index)
+          dataOut.writeInt(srid)
+        }
+      }
+
+      protected override def writeIteratorToStream(dataOut: DataOutputStream): 
Unit = {
+        val arrowSchema =
+          ArrowUtils.toArrowSchema(schema, timeZoneId, 
errorOnDuplicatedFieldNames, largeVarTypes)
+        val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+          s"stdout writer for $pythonExec",
+          0,
+          Long.MaxValue)
+        val root = VectorSchemaRoot.create(arrowSchema, allocator)
+
+        Utils.tryWithSafeFinally {
+          val writer = new ArrowStreamWriter(root, null, dataOut)
+          writer.start()
+
+          writeIteratorToArrowStream(root, writer, dataOut, inputIterator)
+
+          writer.end()
+        } {
+          root.close()
+          allocator.close()
+        }
+      }
+    }
+  }
+}
+
+private[python] trait SedonaBasicPythonArrowInput
+    extends SedonaPythonArrowInput[Iterator[InternalRow]] {
+  self: SedonaBasePythonRunner[Iterator[InternalRow], _] =>
+
+  protected def writeIteratorToArrowStream(
+      root: VectorSchemaRoot,
+      writer: ArrowStreamWriter,
+      dataOut: DataOutputStream,
+      inputIterator: Iterator[Iterator[InternalRow]]): Unit = {
+    val arrowWriter = ArrowWriter.create(root)
+    while (inputIterator.hasNext) {
+      val startData = dataOut.size()
+      val nextBatch = inputIterator.next()
+
+      while (nextBatch.hasNext) {
+        arrowWriter.write(nextBatch.next())
+      }
+
+      arrowWriter.finish()
+      writer.writeBatch()
+      arrowWriter.reset()
+      val deltaData = dataOut.size() - startData
+      pythonMetrics("pythonDataSent") += deltaData
+    }
+  }
+}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
new file mode 100644
index 0000000000..d031605260
--- /dev/null
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import java.io.DataInputStream
+import java.net.Socket
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.JavaConverters._
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamReader
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{BasePythonRunner, SpecialLengths}
+import org.apache.spark.internal.config.Python.PYTHON_WORKER_REUSE
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ArrowUtils
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnVector, 
ColumnarBatch}
+
+private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: 
BasePythonRunner[_, OUT] =>
+
+  private val reuseWorker =
+    SparkEnv.get.conf.getBoolean(PYTHON_WORKER_REUSE.key, 
PYTHON_WORKER_REUSE.defaultValue.get)
+
+  protected def pythonMetrics: Map[String, SQLMetric]
+
+  protected def deserializeColumnarBatch(batch: ColumnarBatch, schema: 
StructType): OUT
+
+  protected def newReaderIterator(
+      stream: DataInputStream,
+      writerThread: WriterThread,
+      startTime: Long,
+      env: SparkEnv,
+      worker: Socket,
+      pid: Option[Int],
+      releasedOrClosed: AtomicBoolean,
+      context: TaskContext): Iterator[OUT] = {
+
+    new ReaderIterator(
+      stream,
+      writerThread,
+      startTime,
+      env,
+      worker,
+      pid,
+      releasedOrClosed,
+      context) {
+
+      private val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+        s"stdin reader for $pythonExec",
+        0,
+        Long.MaxValue)
+
+      private var reader: ArrowStreamReader = _
+      private var root: VectorSchemaRoot = _
+      private var schema: StructType = _
+      private var vectors: Array[ColumnVector] = _
+      private var eos = false
+      private var nextObj: OUT = _
+
+      context.addTaskCompletionListener[Unit] { _ =>
+        if (reader != null) {
+          reader.close(false)
+        }
+        allocator.close()
+      }
+
+      private var batchLoaded = true
+
+      protected def handleEndOfDataSectionSedona(): Unit = {
+        // Check whether the worker is ready to be reused.
+        if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
+          if (reuseWorker && releasedOrClosed.compareAndSet(false, true)) {
+            WorkerContext.releasePythonWorker(pythonExec, 
envVars.asScala.toMap, worker)
+          }
+        }
+        eos = true
+      }
+
+      protected override def handleEndOfDataSection(): Unit = {
+        handleEndOfDataSectionSedona()
+      }
+
+      override def hasNext: Boolean = nextObj != null || {
+        if (!eos) {
+          nextObj = read()
+          hasNext
+        } else {
+          false
+        }
+      }
+
+      override def next(): OUT = {
+        if (hasNext) {
+          val obj = nextObj
+          nextObj = null.asInstanceOf[OUT]
+          obj
+        } else {
+          Iterator.empty.next()
+        }
+      }
+
+      protected override def read(): OUT = {
+        if (writerThread.exception.isDefined) {
+          throw writerThread.exception.get
+        }
+        try {
+          if (reader != null && batchLoaded) {
+            val bytesReadStart = reader.bytesRead()
+            batchLoaded = reader.loadNextBatch()
+            if (batchLoaded) {
+              val batch = new ColumnarBatch(vectors)
+              val rowCount = root.getRowCount
+              batch.setNumRows(root.getRowCount)
+              val bytesReadEnd = reader.bytesRead()
+              pythonMetrics("pythonNumRowsReceived") += rowCount
+              pythonMetrics("pythonDataReceived") += bytesReadEnd - 
bytesReadStart
+              deserializeColumnarBatch(batch, schema)
+            } else {
+              reader.close(false)
+              allocator.close()
+              read()
+            }
+          } else {
+            val specialSign = stream.readInt()
+
+            specialSign match {
+              case SpecialLengths.START_ARROW_STREAM =>
+                reader = new ArrowStreamReader(stream, allocator)
+                root = reader.getVectorSchemaRoot()
+                schema = ArrowUtils.fromArrowSchema(root.getSchema())
+                vectors = root
+                  .getFieldVectors()
+                  .asScala
+                  .map { vector =>
+                    new ArrowColumnVector(vector)
+                  }
+                  .toArray[ColumnVector]
+
+                read()
+              case SpecialLengths.TIMING_DATA =>
+                handleTimingData()
+                read()
+              case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+                throw handlePythonException()
+              case SpecialLengths.END_OF_DATA_SECTION =>
+                handleEndOfDataSection()
+                null.asInstanceOf[OUT]
+            }
+          }
+        } catch handleException
+      }
+    }
+  }
+}
+
+private[python] trait SedonaBasicPythonArrowOutput
+    extends SedonaPythonArrowOutput[ColumnarBatch] {
+  self: BasePythonRunner[_, ColumnarBatch] =>
+
+  protected def deserializeColumnarBatch(
+      batch: ColumnarBatch,
+      schema: StructType): ColumnarBatch = batch
+}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
new file mode 100644
index 0000000000..6411bec97e
--- /dev/null
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import java.net.Socket
+import scala.collection.mutable
+
+object WorkerContext {
+
+  def createPythonWorker(
+      pythonExec: String,
+      envVars: Map[String, String]): (java.net.Socket, Option[Int]) = {
+    synchronized {
+      val key = (pythonExec, envVars)
+      pythonWorkers.getOrElseUpdate(key, new SedonaDBWorkerFactory(pythonExec, 
envVars)).create()
+    }
+  }
+
+  def destroyPythonWorker(
+      pythonExec: String,
+      envVars: Map[String, String],
+      worker: Socket): Unit = {
+    synchronized {
+      val key = (pythonExec, envVars)
+      pythonWorkers.get(key).foreach(_.stopWorker(worker))
+    }
+  }
+
+  def releasePythonWorker(
+      pythonExec: String,
+      envVars: Map[String, String],
+      worker: Socket): Unit = {
+    synchronized {
+      val key = (pythonExec, envVars)
+      pythonWorkers.get(key).foreach(_.releaseWorker(worker))
+    }
+  }
+
+  private val pythonWorkers =
+    mutable.HashMap[(String, Map[String, String]), SedonaDBWorkerFactory]()
+
+}
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
index 3d3301580c..3584cb01bd 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
@@ -44,9 +44,8 @@ class ExtractSedonaUDFRule extends Rule[LogicalPlan] with 
Logging {
   }
 
   def isScalarPythonUDF(e: Expression): Boolean = {
-    e.isInstanceOf[PythonUDF] && e
-      .asInstanceOf[PythonUDF]
-      .evalType == PythonEvalType.SQL_SCALAR_SEDONA_UDF
+    e.isInstanceOf[PythonUDF] &&
+    PythonEvalType.evals.contains(e.asInstanceOf[PythonUDF].evalType)
   }
 
   private def collectEvaluableUDFsFromExpressions(
@@ -168,13 +167,12 @@ class ExtractSedonaUDFRule extends Rule[LogicalPlan] with 
Logging {
               evalTypes.mkString(","))
         }
         val evalType = evalTypes.head
-        val evaluation = evalType match {
-          case PythonEvalType.SQL_SCALAR_SEDONA_UDF =>
-            SedonaArrowEvalPython(validUdfs, resultAttrs, child, evalType)
-          case _ =>
-            throw new IllegalStateException("Unexpected UDF evalType")
+        if (!PythonEvalType.evals().contains(evalType)) {
+          throw new IllegalStateException(s"Unexpected UDF evalType: 
$evalType")
         }
 
+        val evaluation = SedonaArrowEvalPython(validUdfs, resultAttrs, child, 
evalType)
+
         attributeMap ++= 
validUdfs.map(canonicalizeDeterministic).zip(resultAttrs)
         evaluation
       } else {
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
deleted file mode 100644
index a403fa6b9e..0000000000
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.udf
-
-import org.apache.sedona.sql.UDF.PythonEvalType
-import org.apache.spark.api.python.ChainedPythonFunctions
-import org.apache.spark.{JobArtifactSet, TaskContext}
-import org.apache.spark.sql.Strategy
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, PythonUDF}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.python.{ArrowPythonRunner, 
BatchIterator, EvalPythonExec, PythonSQLMetrics}
-import org.apache.spark.sql.types.StructType
-
-import scala.collection.JavaConverters.asScalaIteratorConverter
-
-// We use custom Strategy to avoid Apache Spark assert on types, we
-// can consider extending this to support other engines working with
-// arrow data
-class SedonaArrowStrategy extends Strategy {
-  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-    case SedonaArrowEvalPython(udfs, output, child, evalType) =>
-      SedonaArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: 
Nil
-    case _ => Nil
-  }
-}
-
-// It's modification og Apache Spark's ArrowEvalPythonExec, we remove the 
check on the types to allow geometry types
-// here, it's initial version to allow the vectorized udf for Sedona geometry 
types. We can consider extending this
-// to support other engines working with arrow data
-case class SedonaArrowEvalPythonExec(
-    udfs: Seq[PythonUDF],
-    resultAttrs: Seq[Attribute],
-    child: SparkPlan,
-    evalType: Int)
-    extends EvalPythonExec
-    with PythonSQLMetrics {
-
-  private val batchSize = conf.arrowMaxRecordsPerBatch
-  private val sessionLocalTimeZone = conf.sessionLocalTimeZone
-  private val largeVarTypes = conf.arrowUseLargeVarTypes
-  private val pythonRunnerConf = ArrowPythonRunner.getPythonRunnerConfMap(conf)
-  private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
-
-  protected override def evaluate(
-      funcs: Seq[ChainedPythonFunctions],
-      argOffsets: Array[Array[Int]],
-      iter: Iterator[InternalRow],
-      schema: StructType,
-      context: TaskContext): Iterator[InternalRow] = {
-
-    val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else 
Iterator(iter)
-
-    val columnarBatchIter = new ArrowPythonRunner(
-      funcs,
-      evalType - PythonEvalType.SEDONA_UDF_TYPE_CONSTANT,
-      argOffsets,
-      schema,
-      sessionLocalTimeZone,
-      largeVarTypes,
-      pythonRunnerConf,
-      pythonMetrics,
-      jobArtifactUUID).compute(batchIter, context.partitionId(), context)
-
-    columnarBatchIter.flatMap { batch =>
-      batch.rowIterator.asScala
-    }
-  }
-
-  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    copy(child = newChild)
-}
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 28943ff11d..52deaa7b87 100644
--- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -26,7 +26,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSpec}
 import org.testcontainers.containers.MinIOContainer
 
 import java.io.FileInputStream
-
 import java.util.concurrent.ThreadLocalRandom
 
 trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
@@ -46,6 +45,9 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
     // We need to be explicit about broadcasting in tests.
     .config("sedona.join.autoBroadcastJoinThreshold", "-1")
     .config("spark.sql.extensions", 
"org.apache.sedona.sql.SedonaSqlExtensions")
+    .config("sedona.python.worker.udf.module", "sedona.spark.worker.worker")
+    .config("sedona.python.worker.udf.daemon.module", "sedonaworker.daemon")
+    .config("sedona.python.worker.daemon.enabled", "false")
     .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean())
     .getOrCreate()
 
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index 8d41848de9..1593340807 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.udf
 
 import org.apache.sedona.sql.TestBaseScala
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.udf.ScalarUDF.geoPandasScalaFunction
+import org.apache.spark.sql.functions.{col, expr, lit}
+import org.apache.spark.sql.udf.ScalarUDF.{geoPandasScalaFunction, 
sedonaDBGeometryToGeometryFunction}
 import org.locationtech.jts.io.WKTReader
 import org.scalatest.matchers.should.Matchers
 
@@ -35,7 +35,11 @@ class StrategySuite extends TestBaseScala with Matchers {
 
   import spark.implicits._
 
-  it("sedona geospatial UDF") {
+  it("sedona geospatial UDF - geopandas") {
+    if (!sparkSession.version.startsWith("3.5")) {
+      cancel("spark 3.5 is supported only.")
+    }
+
     val df = Seq(
       (1, "value", wktReader.read("POINT(21 52)")),
       (2, "value1", wktReader.read("POINT(20 50)")),
@@ -43,11 +47,14 @@ class StrategySuite extends TestBaseScala with Matchers {
       (4, "value3", wktReader.read("POINT(20 48)")),
       (5, "value4", wktReader.read("POINT(20 47)")))
       .toDF("id", "value", "geom")
+
+    val geopandasUDFDF = df
       .withColumn("geom_buffer", geoPandasScalaFunction(col("geom")))
 
-    df.count shouldEqual 5
+    geopandasUDFDF.count shouldEqual 5
 
-    df.selectExpr("ST_AsText(ST_ReducePrecision(geom_buffer, 2))")
+    geopandasUDFDF
+      .selectExpr("ST_AsText(ST_ReducePrecision(geom_buffer, 2))")
       .as[String]
       .collect() should contain theSameElementsAs Seq(
       "POLYGON ((20 51, 20 53, 22 53, 22 51, 20 51))",
@@ -56,4 +63,29 @@ class StrategySuite extends TestBaseScala with Matchers {
       "POLYGON ((19 47, 19 49, 21 49, 21 47, 19 47))",
       "POLYGON ((19 46, 19 48, 21 48, 21 46, 19 46))")
   }
+
+  it("sedona geospatial UDF - sedona db") {
+    if (!sparkSession.version.startsWith("3.5")) {
+      cancel("spark 3.5 is supported only.")
+    }
+
+    val df = Seq(
+      (1, "value", wktReader.read("POINT(21 52)")),
+      (2, "value1", wktReader.read("POINT(20 50)")),
+      (3, "value2", wktReader.read("POINT(20 49)")),
+      (4, "value3", wktReader.read("POINT(20 48)")),
+      (5, "value4", wktReader.read("POINT(20 47)")))
+      .toDF("id", "value", "geom")
+
+    val dfVectorized = df
+      .withColumn("geometry", expr("ST_SetSRID(geom, '4326')"))
+      .select(sedonaDBGeometryToGeometryFunction(col("geometry"), 
lit(100)).alias("geom"))
+
+    dfVectorized
+      .selectExpr("ST_X(ST_Centroid(geom)) AS x")
+      .selectExpr("sum(x)")
+      .as[Double]
+      .collect()
+      .head shouldEqual 101
+  }
 }
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
index c0a2d8f260..06e05fd750 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
@@ -19,11 +19,13 @@
 package org.apache.spark.sql.udf
 
 import org.apache.sedona.sql.UDF
-import org.apache.spark.TestUtils
+import org.apache.spark.{SparkEnv, TestUtils}
 import org.apache.spark.api.python._
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.config.Python.{PYTHON_USE_DAEMON, 
PYTHON_WORKER_MODULE}
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.FloatType
 import org.apache.spark.util.Utils
 
 import java.io.File
@@ -54,7 +56,7 @@ object ScalarUDF {
 
   private lazy val isPythonAvailable: Boolean = 
TestUtils.testCommandAvailable(pythonExec)
 
-  lazy val pythonVer: String = if (isPythonAvailable) {
+  val pythonVer: String = if (isPythonAvailable) {
     Process(
       Seq(pythonExec, "-c", "import sys; print('%d.%d' % 
sys.version_info[:2])"),
       None,
@@ -70,31 +72,86 @@ object ScalarUDF {
     finally Utils.deleteRecursively(path)
   }
 
-  val pandasFunc: Array[Byte] = {
+  val additionalModule = 
"spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf"
+
+  val vectorizedFunction: Array[Byte] = {
+    var binaryPandasFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          pythonExec,
+          "-c",
+          f"""
+             |from pyspark.sql.types import FloatType
+             |from pyspark.serializers import CloudPickleSerializer
+             |f = open('$path', 'wb');
+             |
+             |def apply_function_on_number(x):
+             |    return x + 1.0
+             |f.write(CloudPickleSerializer().dumps((apply_function_on_number, 
FloatType())))
+             |""".stripMargin),
+        None,
+        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+      binaryPandasFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryPandasFunc != null)
+    binaryPandasFunc
+  }
+
+  val sedonaDBGeometryToGeometryFunctionBytes: Array[Byte] = {
+    var binaryPandasFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          pythonExec,
+          "-c",
+          f"""
+             |import pyarrow as pa
+             |import shapely
+             |import geoarrow.pyarrow as ga
+             |from sedonadb import udf
+             |from sedona.sql.types import GeometryType
+             |from pyspark.serializers import CloudPickleSerializer
+             |from pyspark.sql.types import DoubleType, IntegerType
+             |from sedonadb import udf as sedona_udf_module
+             |from sedona.spark.utils.geometry_serde import from_sedona
+             |from sedona.spark.utils.geometry_serde import to_sedona
+             |import numpy as np
+             |
+             |@sedona_udf_module.arrow_udf(ga.wkb(), 
[sedona_udf_module.BINARY, udf.NUMERIC])
+             |def geometry_udf(geom, distance):
+             |    geom_wkb = pa.array(geom.storage.to_array())
+             |    geometry_array = np.asarray(geom_wkb, dtype=object)
+             |    geom = from_sedona(geometry_array)
+             |    return pa.array(to_sedona(geom))
+             |
+             |f = open('$path', 'wb');
+             |f.write(CloudPickleSerializer().dumps((lambda: geometry_udf, 
GeometryType())))
+             |""".stripMargin),
+        None,
+        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+      binaryPandasFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryPandasFunc != null)
+    binaryPandasFunc
+  }
+
+  val geopandasNonGeometryToGeometryFunction: Array[Byte] = {
     var binaryPandasFunc: Array[Byte] = null
     withTempPath { path =>
-      println(path)
       Process(
         Seq(
           pythonExec,
           "-c",
           f"""
-            |from pyspark.sql.types import IntegerType
-            |from shapely.geometry import Point
-            |from sedona.sql.types import GeometryType
-            |from pyspark.serializers import CloudPickleSerializer
-            |from sedona.utils import geometry_serde
-            |from shapely import box
-            |f = open('$path', 'wb');
-            |def w(x):
-            |    def apply_function(w):
-            |        geom, offset = geometry_serde.deserialize(w)
-            |        bounds = geom.buffer(1).bounds
-            |        x = box(*bounds)
-            |        return geometry_serde.serialize(x)
-            |    return x.apply(apply_function)
-            |f.write(CloudPickleSerializer().dumps((w, GeometryType())))
-            |""".stripMargin),
+               |from sedona.sql.types import GeometryType
+               |from shapely.wkt import loads
+               |from pyspark.serializers import CloudPickleSerializer
+               |f = open('$path', 'wb');
+               |def apply_geopandas(x):
+               |    return x.apply(lambda wkt: loads(wkt).buffer(1))
+               |f.write(CloudPickleSerializer().dumps((apply_geopandas, 
GeometryType())))
+               |""".stripMargin),
         None,
         "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
       binaryPandasFunc = Files.readAllBytes(path.toPath)
@@ -104,7 +161,39 @@ object ScalarUDF {
   }
 
   private val workerEnv = new java.util.HashMap[String, String]()
-  workerEnv.put("PYTHONPATH", s"$pysparkPythonPath:$pythonPath")
+
+  val pandasFunc: Array[Byte] = {
+    var binaryPandasFunc: Array[Byte] = null
+    withTempPath { path =>
+      println(path)
+      Process(
+        Seq(
+          pythonExec,
+          "-c",
+          f"""
+             |from pyspark.sql.types import IntegerType
+             |from shapely.geometry import Point
+             |from sedona.sql.types import GeometryType
+             |from pyspark.serializers import CloudPickleSerializer
+             |from sedona.utils import geometry_serde
+             |from shapely import box
+             |f = open('$path', 'wb');
+             |def w(x):
+             |    def apply_function(w):
+             |        geom, offset = geometry_serde.deserialize(w)
+             |        bounds = geom.buffer(1).bounds
+             |        x = box(*bounds)
+             |        return geometry_serde.serialize(x)
+             |    return x.apply(apply_function)
+             |f.write(CloudPickleSerializer().dumps((w, GeometryType())))
+             |""".stripMargin),
+        None,
+        "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
+      binaryPandasFunc = Files.readAllBytes(path.toPath)
+    }
+    assert(binaryPandasFunc != null)
+    binaryPandasFunc
+  }
 
   val geoPandasScalaFunction: UserDefinedPythonFunction = 
UserDefinedPythonFunction(
     name = "geospatial_udf",
@@ -119,4 +208,19 @@ object ScalarUDF {
     dataType = GeometryUDT,
     pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF,
     udfDeterministic = true)
+
+  val sedonaDBGeometryToGeometryFunction: UserDefinedPythonFunction = 
UserDefinedPythonFunction(
+    name = "geospatial_udf",
+    func = SimplePythonFunction(
+      command = sedonaDBGeometryToGeometryFunctionBytes,
+      envVars = workerEnv.clone().asInstanceOf[java.util.Map[String, String]],
+      pythonIncludes = List.empty[String].asJava,
+      pythonExec = pythonExec,
+      pythonVer = pythonVer,
+      broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
+      accumulator = null),
+    dataType = GeometryUDT,
+    pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF,
+    udfDeterministic = true)
+
 }
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
similarity index 89%
rename from 
spark/spark-4.0/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
rename to 
spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
index 21e389ee56..002bb2b167 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.udf
+package org.apache.spark.sql.execution.python
 
 import org.apache.sedona.sql.UDF.PythonEvalType
 import org.apache.spark.api.python.ChainedPythonFunctions
@@ -25,11 +25,8 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, PythonUDF}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.python.SedonaArrowEvalPythonExec
-import org.apache.spark.sql.types.StructType
-
-import scala.collection.JavaConverters.asScalaIteratorConverter
+import org.apache.spark.sql.udf.SedonaArrowEvalPython
 
 // We use custom Strategy to avoid Apache Spark assert on types, we
 // can consider extending this to support other engines working with
diff --git 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
index 3d3301580c..3584cb01bd 100644
--- 
a/spark/spark-4.0/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
+++ 
b/spark/spark-4.0/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
@@ -44,9 +44,8 @@ class ExtractSedonaUDFRule extends Rule[LogicalPlan] with 
Logging {
   }
 
   def isScalarPythonUDF(e: Expression): Boolean = {
-    e.isInstanceOf[PythonUDF] && e
-      .asInstanceOf[PythonUDF]
-      .evalType == PythonEvalType.SQL_SCALAR_SEDONA_UDF
+    e.isInstanceOf[PythonUDF] &&
+    PythonEvalType.evals.contains(e.asInstanceOf[PythonUDF].evalType)
   }
 
   private def collectEvaluableUDFsFromExpressions(
@@ -168,13 +167,12 @@ class ExtractSedonaUDFRule extends Rule[LogicalPlan] with 
Logging {
               evalTypes.mkString(","))
         }
         val evalType = evalTypes.head
-        val evaluation = evalType match {
-          case PythonEvalType.SQL_SCALAR_SEDONA_UDF =>
-            SedonaArrowEvalPython(validUdfs, resultAttrs, child, evalType)
-          case _ =>
-            throw new IllegalStateException("Unexpected UDF evalType")
+        if (!PythonEvalType.evals().contains(evalType)) {
+          throw new IllegalStateException(s"Unexpected UDF evalType: 
$evalType")
         }
 
+        val evaluation = SedonaArrowEvalPython(validUdfs, resultAttrs, child, 
evalType)
+
         attributeMap ++= 
validUdfs.map(canonicalizeDeterministic).zip(resultAttrs)
         evaluation
       } else {

Reply via email to