This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 32051db5a10450f0ede4d363550e1bb7a4a681cd
Author: pawelkocinski <[email protected]>
AuthorDate: Sat Aug 2 14:42:20 2025 +0200

    SEDONA-738 Fix unit tests.
---
 sedonaworker/serializer.py | 135 ++++++---------------------------------------
 sedonaworker/worker.py     | 123 ++++++++---------------------------------
 2 files changed, 42 insertions(+), 216 deletions(-)

diff --git a/sedonaworker/serializer.py b/sedonaworker/serializer.py
index 0f85344f86..229c06ce04 100644
--- a/sedonaworker/serializer.py
+++ b/sedonaworker/serializer.py
@@ -1,54 +1,28 @@
 from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer, 
ArrowStreamSerializer
 from pyspark.errors import PySparkTypeError, PySparkValueError
 import struct
-from pyspark.sql.pandas.types import (
-    from_arrow_type,
-    _create_converter_from_pandas,
-)
+
+from pyspark.serializers import write_int
 
 def write_int(value, stream):
     stream.write(struct.pack("!i", value))
 
 class SpecialLengths:
-    END_OF_DATA_SECTION = -1
-    PYTHON_EXCEPTION_THROWN = -2
-    TIMING_DATA = -3
-    END_OF_STREAM = -4
-    NULL = -5
     START_ARROW_STREAM = -6
 
 
 class SedonaArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
-    """
-    Serializer used by Python worker to evaluate Pandas UDFs
-    """
-
-    def __init__(
-            self,
-            timezone,
-            safecheck,
-            assign_cols_by_name,
-            df_for_struct=False,
-            struct_in_pandas="dict",
-            ndarray_as_list=False,
-            arrow_cast=False,
-    ):
+    def __init__(self, timezone, safecheck, assign_cols_by_name):
         super(SedonaArrowStreamPandasUDFSerializer, self).__init__(timezone, 
safecheck)
         self._assign_cols_by_name = assign_cols_by_name
-        self._df_for_struct = df_for_struct
-        self._struct_in_pandas = struct_in_pandas
-        self._ndarray_as_list = ndarray_as_list
-        self._arrow_cast = arrow_cast
 
     def load_stream(self, stream):
-        """
-        Deserialize ArrowRecordBatches to an Arrow table and return as a list 
of pandas.Series.
-        """
         import geoarrow.pyarrow as ga
-        batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
         import pyarrow as pa
+
+        batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
         for batch in batches:
-            table = pa.Table.from_batches([batch])
+            table = pa.Table.from_batches(batches=[batch])
             data = []
 
             for c in table.itercolumns():
@@ -62,98 +36,33 @@ class 
SedonaArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
             yield data
 
     def _create_batch(self, series):
-        """
-        Create an Arrow record batch from the given pandas.Series 
pandas.DataFrame
-        or list of Series or DataFrame, with optional type.
-
-        Parameters
-        ----------
-        series : pandas.Series or pandas.DataFrame or list
-            A single series or dataframe, list of series or dataframe,
-            or list of (series or dataframe, arrow_type)
-
-        Returns
-        -------
-        pyarrow.RecordBatch
-            Arrow RecordBatch
-        """
         import pyarrow as pa
-        import geopandas as gpd
-        from shapely.geometry.base import BaseGeometry
-        # Make input conform to [(series1, type1), (series2, type2), ...]
-        if not isinstance(series, (list, tuple)) or (
-                len(series) == 2 and isinstance(series[1], pa.DataType)
-        ):
-            series = [series]
+
         series = ((s, None) if not isinstance(s, (list, tuple)) else s for s 
in series)
 
         arrs = []
         for s, t in series:
-            # TODO here we should look into the return type
-            first_element = s.iloc[0]
-            if isinstance(first_element, BaseGeometry):
-                arrs.append(self._create_array(gpd.GeoSeries(s), t, 
arrow_cast=self._arrow_cast))
-                continue
+            arrs.append(self._create_array(s, t))
 
-            arrs.append(self._create_array(s, t, arrow_cast=self._arrow_cast))
         return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
 
-    def _create_array(self, series, arrow_type, spark_type=None, 
arrow_cast=False):
-        """
-        Create an Arrow Array from the given pandas.Series and optional type.
-
-        Parameters
-        ----------
-        series : pandas.Series
-            A single series
-        arrow_type : pyarrow.DataType, optional
-            If None, pyarrow's inferred type will be used
-        spark_type : DataType, optional
-            If None, spark type converted from arrow_type will be used
-        arrow_cast: bool, optional
-            Whether to apply Arrow casting when the user-specified return type 
mismatches the
-            actual return values.
-
-        Returns
-        -------
-        pyarrow.Array
-        """
+    def _create_array(self, series, arrow_type):
         import pyarrow as pa
-        from pandas.api.types import is_categorical_dtype
-        if is_categorical_dtype(series.dtype):
-            series = series.astype(series.dtypes.categories.dtype)
-        if arrow_type is not None:
-            dt = spark_type or from_arrow_type(arrow_type, 
prefer_timestamp_ntz=True)
-            # TODO(SPARK-43579): cache the converter for reuse
-            conv = _create_converter_from_pandas(
-                dt, timezone=self._timezone, 
error_on_duplicated_field_names=False
-            )
-            series = conv(series)
+        import geopandas as gpd
 
         if hasattr(series.array, "__arrow_array__"):
             mask = None
         else:
             mask = series.isnull()
+
         try:
-            try:
-                import geopandas as gpd
-                if isinstance(series, gpd.GeoSeries):
-                    import geoarrow.pyarrow as ga
-                    # If the series is a GeoSeries, convert it to an Arrow 
array using geoarrow
-                    return ga.array(series)
-
-                array = pa.Array.from_pandas(
-                    series, mask=mask, type=arrow_type, safe=self._safecheck
-                )
-
-                return array
-            except pa.lib.ArrowInvalid:
-                if arrow_cast:
-                    return pa.Array.from_pandas(series, mask=mask).cast(
-                        target_type=arrow_type, safe=self._safecheck
-                    )
-                else:
-                    raise
+            if isinstance(series, gpd.GeoSeries):
+                import geoarrow.pyarrow as ga
+                # If the series is a GeoSeries, convert it to an Arrow array 
using geoarrow
+                return ga.array(series)
+
+            array = pa.Array.from_pandas(series, mask=mask, type=arrow_type)
+            return array
         except TypeError as e:
             error_msg = (
                 "Exception thrown when converting pandas.Series (%s) "
@@ -165,13 +74,6 @@ class 
SedonaArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
                 "Exception thrown when converting pandas.Series (%s) "
                 "with name '%s' to Arrow Array (%s)."
             )
-            if self._safecheck:
-                error_msg = error_msg + (
-                    " It can be caused by overflows or other "
-                    "unsafe conversions warned by Arrow. Arrow safe type check 
"
-                    "can be disabled by using SQL config "
-                    "`spark.sql.execution.pandas.convertToArrowArraySafely`."
-                )
             raise PySparkValueError(error_msg % (series.dtype, series.name, 
arrow_type)) from e
 
     def dump_stream(self, iterator, stream):
@@ -195,5 +97,4 @@ class 
SedonaArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
     def __repr__(self):
         return "ArrowStreamPandasUDFSerializer"
 
-from pyspark.serializers import Serializer, write_int
 
diff --git a/sedonaworker/worker.py b/sedonaworker/worker.py
index 2904fab764..98ee38242b 100644
--- a/sedonaworker/worker.py
+++ b/sedonaworker/worker.py
@@ -18,19 +18,14 @@
 """
 Worker that receives input from Piped RDD.
 """
-import logging
 import os
 import sys
 import time
-from inspect import currentframe, getframeinfo, getfullargspec
+from inspect import currentframe, getframeinfo
 import importlib
-import json
-from io import BufferedRWPair
-from typing import Any, Iterable, Iterator
 
 from sedonaworker.serializer import SedonaArrowStreamPandasUDFSerializer
 
-# 'resource' is a Unix specific module.
 has_resource_module = True
 try:
     import resource
@@ -57,13 +52,10 @@ from pyspark.serializers import (
     SpecialLengths,
     UTF8Deserializer,
     CPickleSerializer,
-    BatchedSerializer,
-)
-from pyspark.sql.pandas.serializers import (
-    ArrowStreamPandasUDFSerializer, ArrowStreamSerializer
 )
+
 from pyspark.sql.pandas.types import to_arrow_type
-from pyspark.sql.types import BinaryType, StringType, StructType, 
_parse_datatype_json_string
+from pyspark.sql.types import StructType
 from pyspark.util import fail_on_stopiteration, try_simplify_traceback
 from pyspark import shuffle
 from pyspark.errors import PySparkRuntimeError, PySparkTypeError
@@ -72,51 +64,6 @@ pickleSer = CPickleSerializer()
 utf8_deserializer = UTF8Deserializer()
 
 
-class SedonaArrowStreamUDFSerializer(ArrowStreamSerializer):
-    """
-    Same as :class:`ArrowStreamSerializer` but it flattens the struct to Arrow 
record batch
-    for applying each function with the raw record arrow batch. See also 
`DataFrame.mapInArrow`.
-    """
-
-    def load_stream(self, stream):
-        """
-        Flatten the struct into Arrow's record batches.
-        """
-        import pyarrow as pa
-
-        batches = super(SedonaArrowStreamUDFSerializer, 
self).load_stream(stream)
-        for batch in batches:
-            struct = batch.column(0)
-            yield [pa.RecordBatch.from_arrays(struct.flatten(), 
schema=pa.schema(struct.type))]
-
-    def dump_stream(self, iterator, stream):
-        """
-        Override because Pandas UDFs require a START_ARROW_STREAM before the 
Arrow stream is sent.
-        This should be sent after creating the first record batch so in case 
of an error, it can
-        be sent back to the JVM before the Arrow stream starts.
-        """
-        import pyarrow as pa
-
-        def wrap_and_init_stream():
-            should_write_start_length = True
-            for batch, _ in iterator:
-                assert isinstance(batch, pa.RecordBatch)
-
-                # Wrap the root struct
-                struct = pa.StructArray.from_arrays(
-                    batch.columns, fields=pa.struct(list(batch.schema))
-                )
-                batch = pa.RecordBatch.from_arrays([struct], ["_0"])
-
-                # Write the first record batch with initialization.
-                if should_write_start_length:
-                    write_int(SpecialLengths.START_ARROW_STREAM, stream)
-                    should_write_start_length = False
-                yield batch
-
-        return super(SedonaArrowStreamUDFSerializer, 
self).dump_stream(wrap_and_init_stream(), stream)
-
-
 def report_times(outfile, boot, init, finish):
     write_int(SpecialLengths.TIMING_DATA, outfile)
     write_long(int(1000 * boot), outfile)
@@ -185,19 +132,11 @@ def read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index):
         else:
             chained_func = chain(chained_func, f)
 
-    if eval_type in (
-            PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
-            PythonEvalType.SQL_ARROW_BATCHED_UDF,
-    ):
-        func = chained_func
-    else:
-        # make sure StopIteration's raised in the user code are not ignored
-        # when they are processed in a for loop, raise them as RuntimeError's 
instead
-        func = fail_on_stopiteration(chained_func)
+    func = fail_on_stopiteration(chained_func)
 
     # the last returnType will be the return type of UDF
     if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
-        return arg_offsets, wrap_scalar_pandas_udf(func, return_type)
+        return arg_offsets, wrap_scalar_pandas_udf(func, return_type), 
return_type
     else:
         raise ValueError("Unknown eval type: {}".format(eval_type))
 
@@ -214,6 +153,8 @@ def assign_cols_by_name(runner_conf):
 
 
 def read_udfs(pickleSer, infile, eval_type):
+    from sedona.sql.types import GeometryType
+    import geopandas as gpd
     runner_conf = {}
 
     # Load conf used for pandas_udf evaluation
@@ -223,30 +164,12 @@ def read_udfs(pickleSer, infile, eval_type):
         v = utf8_deserializer.loads(infile)
         runner_conf[k] = v
 
-    # state_object_schema = None
-    # if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
-    #     state_object_schema = 
StructType.fromJson(json.loads(utf8_deserializer.loads(infile)))
-
-    # NOTE: if timezone is set here, that implies respectSessionTimeZone is 
True
     timezone = runner_conf.get("spark.sql.session.timeZone", None)
     safecheck = (
             
runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely", 
"false").lower()
             == "true"
     )
 
-    df_for_struct = (
-            eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF
-            or eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
-            or eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
-    )
-    # Arrow-optimized Python UDF takes a struct type argument as a Row
-    struct_in_pandas = (
-        "row" if eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF else "dict"
-    )
-    ndarray_as_list = eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF
-    # Arrow-optimized Python UDF uses explicit Arrow cast for type coercion
-    arrow_cast = eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF
-
     num_udfs = read_int(infile)
 
     udfs = []
@@ -254,13 +177,24 @@ def read_udfs(pickleSer, infile, eval_type):
         udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, 
udf_index=i))
 
     def mapper(a):
-        result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in 
udfs)
-        # In the special case of a single UDF this will return a single result 
rather
-        # than a tuple of results; this is the format that the JVM side 
expects.
-        if len(result) == 1:
-            return result[0]
+        results = []
+
+        for (arg_offsets, f, return_type) in udfs:
+            result = f(*[a[o] for o in arg_offsets])
+            if isinstance(return_type, GeometryType):
+                results.append((
+                    gpd.GeoSeries(result[0]),
+                    result[1],
+                ))
+
+                continue
+
+            results.append(result)
+
+        if len(results) == 1:
+            return results[0]
         else:
-            return result
+            return results
 
     def func(_, it):
         return map(mapper, it)
@@ -269,10 +203,6 @@ def read_udfs(pickleSer, infile, eval_type):
         timezone,
         safecheck,
         assign_cols_by_name(runner_conf),
-        df_for_struct,
-        struct_in_pandas,
-        ndarray_as_list,
-        arrow_cast,
     )
 
     # profiling is not supported for UDF
@@ -493,11 +423,6 @@ if __name__ == "__main__":
     java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"])
     auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"]
     (sock_file, _) = local_connect_and_auth(java_port, auth_secret)
-    # TODO: Remove the following two lines and use `Process.pid()` when we 
drop JDK 8.
     write_int(os.getpid(), sock_file)
     sock_file.flush()
     main(sock_file, sock_file)
-
-
-class GeoArrowLoader:
-    pass

Reply via email to