This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch arrow-worker in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 32051db5a10450f0ede4d363550e1bb7a4a681cd Author: pawelkocinski <[email protected]> AuthorDate: Sat Aug 2 14:42:20 2025 +0200 SEDONA-738 Fix unit tests. --- sedonaworker/serializer.py | 135 ++++++--------------------------------------- sedonaworker/worker.py | 123 ++++++++--------------------------------- 2 files changed, 42 insertions(+), 216 deletions(-) diff --git a/sedonaworker/serializer.py b/sedonaworker/serializer.py index 0f85344f86..229c06ce04 100644 --- a/sedonaworker/serializer.py +++ b/sedonaworker/serializer.py @@ -1,54 +1,28 @@ from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer, ArrowStreamSerializer from pyspark.errors import PySparkTypeError, PySparkValueError import struct -from pyspark.sql.pandas.types import ( - from_arrow_type, - _create_converter_from_pandas, -) + +from pyspark.serializers import write_int def write_int(value, stream): stream.write(struct.pack("!i", value)) class SpecialLengths: - END_OF_DATA_SECTION = -1 - PYTHON_EXCEPTION_THROWN = -2 - TIMING_DATA = -3 - END_OF_STREAM = -4 - NULL = -5 START_ARROW_STREAM = -6 class SedonaArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): - """ - Serializer used by Python worker to evaluate Pandas UDFs - """ - - def __init__( - self, - timezone, - safecheck, - assign_cols_by_name, - df_for_struct=False, - struct_in_pandas="dict", - ndarray_as_list=False, - arrow_cast=False, - ): + def __init__(self, timezone, safecheck, assign_cols_by_name): super(SedonaArrowStreamPandasUDFSerializer, self).__init__(timezone, safecheck) self._assign_cols_by_name = assign_cols_by_name - self._df_for_struct = df_for_struct - self._struct_in_pandas = struct_in_pandas - self._ndarray_as_list = ndarray_as_list - self._arrow_cast = arrow_cast def load_stream(self, stream): - """ - Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. - """ import geoarrow.pyarrow as ga - batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) import pyarrow as pa + + batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) for batch in batches: - table = pa.Table.from_batches([batch]) + table = pa.Table.from_batches(batches=[batch]) data = [] for c in table.itercolumns(): @@ -62,98 +36,33 @@ class SedonaArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): yield data def _create_batch(self, series): - """ - Create an Arrow record batch from the given pandas.Series pandas.DataFrame - or list of Series or DataFrame, with optional type. - - Parameters - ---------- - series : pandas.Series or pandas.DataFrame or list - A single series or dataframe, list of series or dataframe, - or list of (series or dataframe, arrow_type) - - Returns - ------- - pyarrow.RecordBatch - Arrow RecordBatch - """ import pyarrow as pa - import geopandas as gpd - from shapely.geometry.base import BaseGeometry - # Make input conform to [(series1, type1), (series2, type2), ...] - if not isinstance(series, (list, tuple)) or ( - len(series) == 2 and isinstance(series[1], pa.DataType) - ): - series = [series] + series = ((s, None) if not isinstance(s, (list, tuple)) else s for s in series) arrs = [] for s, t in series: - # TODO here we should look into the return type - first_element = s.iloc[0] - if isinstance(first_element, BaseGeometry): - arrs.append(self._create_array(gpd.GeoSeries(s), t, arrow_cast=self._arrow_cast)) - continue + arrs.append(self._create_array(s, t)) - arrs.append(self._create_array(s, t, arrow_cast=self._arrow_cast)) return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))]) - def _create_array(self, series, arrow_type, spark_type=None, arrow_cast=False): - """ - Create an Arrow Array from the given pandas.Series and optional type. - - Parameters - ---------- - series : pandas.Series - A single series - arrow_type : pyarrow.DataType, optional - If None, pyarrow's inferred type will be used - spark_type : DataType, optional - If None, spark type converted from arrow_type will be used - arrow_cast: bool, optional - Whether to apply Arrow casting when the user-specified return type mismatches the - actual return values. - - Returns - ------- - pyarrow.Array - """ + def _create_array(self, series, arrow_type): import pyarrow as pa - from pandas.api.types import is_categorical_dtype - if is_categorical_dtype(series.dtype): - series = series.astype(series.dtypes.categories.dtype) - if arrow_type is not None: - dt = spark_type or from_arrow_type(arrow_type, prefer_timestamp_ntz=True) - # TODO(SPARK-43579): cache the converter for reuse - conv = _create_converter_from_pandas( - dt, timezone=self._timezone, error_on_duplicated_field_names=False - ) - series = conv(series) + import geopandas as gpd if hasattr(series.array, "__arrow_array__"): mask = None else: mask = series.isnull() + try: - try: - import geopandas as gpd - if isinstance(series, gpd.GeoSeries): - import geoarrow.pyarrow as ga - # If the series is a GeoSeries, convert it to an Arrow array using geoarrow - return ga.array(series) - - array = pa.Array.from_pandas( - series, mask=mask, type=arrow_type, safe=self._safecheck - ) - - return array - except pa.lib.ArrowInvalid: - if arrow_cast: - return pa.Array.from_pandas(series, mask=mask).cast( - target_type=arrow_type, safe=self._safecheck - ) - else: - raise + if isinstance(series, gpd.GeoSeries): + import geoarrow.pyarrow as ga + # If the series is a GeoSeries, convert it to an Arrow array using geoarrow + return ga.array(series) + + array = pa.Array.from_pandas(series, mask=mask, type=arrow_type) + return array except TypeError as e: error_msg = ( "Exception thrown when converting pandas.Series (%s) " @@ -165,13 +74,6 @@ class SedonaArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): "Exception thrown when converting pandas.Series (%s) " "with name '%s' to Arrow Array (%s)." ) - if self._safecheck: - error_msg = error_msg + ( - " It can be caused by overflows or other " - "unsafe conversions warned by Arrow. Arrow safe type check " - "can be disabled by using SQL config " - "`spark.sql.execution.pandas.convertToArrowArraySafely`." - ) raise PySparkValueError(error_msg % (series.dtype, series.name, arrow_type)) from e def dump_stream(self, iterator, stream): @@ -195,5 +97,4 @@ class SedonaArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer): def __repr__(self): return "ArrowStreamPandasUDFSerializer" -from pyspark.serializers import Serializer, write_int diff --git a/sedonaworker/worker.py b/sedonaworker/worker.py index 2904fab764..98ee38242b 100644 --- a/sedonaworker/worker.py +++ b/sedonaworker/worker.py @@ -18,19 +18,14 @@ """ Worker that receives input from Piped RDD. """ -import logging import os import sys import time -from inspect import currentframe, getframeinfo, getfullargspec +from inspect import currentframe, getframeinfo import importlib -import json -from io import BufferedRWPair -from typing import Any, Iterable, Iterator from sedonaworker.serializer import SedonaArrowStreamPandasUDFSerializer -# 'resource' is a Unix specific module. has_resource_module = True try: import resource @@ -57,13 +52,10 @@ from pyspark.serializers import ( SpecialLengths, UTF8Deserializer, CPickleSerializer, - BatchedSerializer, -) -from pyspark.sql.pandas.serializers import ( - ArrowStreamPandasUDFSerializer, ArrowStreamSerializer ) + from pyspark.sql.pandas.types import to_arrow_type -from pyspark.sql.types import BinaryType, StringType, StructType, _parse_datatype_json_string +from pyspark.sql.types import StructType from pyspark.util import fail_on_stopiteration, try_simplify_traceback from pyspark import shuffle from pyspark.errors import PySparkRuntimeError, PySparkTypeError @@ -72,51 +64,6 @@ pickleSer = CPickleSerializer() utf8_deserializer = UTF8Deserializer() -class SedonaArrowStreamUDFSerializer(ArrowStreamSerializer): - """ - Same as :class:`ArrowStreamSerializer` but it flattens the struct to Arrow record batch - for applying each function with the raw record arrow batch. See also `DataFrame.mapInArrow`. - """ - - def load_stream(self, stream): - """ - Flatten the struct into Arrow's record batches. - """ - import pyarrow as pa - - batches = super(SedonaArrowStreamUDFSerializer, self).load_stream(stream) - for batch in batches: - struct = batch.column(0) - yield [pa.RecordBatch.from_arrays(struct.flatten(), schema=pa.schema(struct.type))] - - def dump_stream(self, iterator, stream): - """ - Override because Pandas UDFs require a START_ARROW_STREAM before the Arrow stream is sent. - This should be sent after creating the first record batch so in case of an error, it can - be sent back to the JVM before the Arrow stream starts. - """ - import pyarrow as pa - - def wrap_and_init_stream(): - should_write_start_length = True - for batch, _ in iterator: - assert isinstance(batch, pa.RecordBatch) - - # Wrap the root struct - struct = pa.StructArray.from_arrays( - batch.columns, fields=pa.struct(list(batch.schema)) - ) - batch = pa.RecordBatch.from_arrays([struct], ["_0"]) - - # Write the first record batch with initialization. - if should_write_start_length: - write_int(SpecialLengths.START_ARROW_STREAM, stream) - should_write_start_length = False - yield batch - - return super(SedonaArrowStreamUDFSerializer, self).dump_stream(wrap_and_init_stream(), stream) - - def report_times(outfile, boot, init, finish): write_int(SpecialLengths.TIMING_DATA, outfile) write_long(int(1000 * boot), outfile) @@ -185,19 +132,11 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): else: chained_func = chain(chained_func, f) - if eval_type in ( - PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, - PythonEvalType.SQL_ARROW_BATCHED_UDF, - ): - func = chained_func - else: - # make sure StopIteration's raised in the user code are not ignored - # when they are processed in a for loop, raise them as RuntimeError's instead - func = fail_on_stopiteration(chained_func) + func = fail_on_stopiteration(chained_func) # the last returnType will be the return type of UDF if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF: - return arg_offsets, wrap_scalar_pandas_udf(func, return_type) + return arg_offsets, wrap_scalar_pandas_udf(func, return_type), return_type else: raise ValueError("Unknown eval type: {}".format(eval_type)) @@ -214,6 +153,8 @@ def assign_cols_by_name(runner_conf): def read_udfs(pickleSer, infile, eval_type): + from sedona.sql.types import GeometryType + import geopandas as gpd runner_conf = {} # Load conf used for pandas_udf evaluation @@ -223,30 +164,12 @@ def read_udfs(pickleSer, infile, eval_type): v = utf8_deserializer.loads(infile) runner_conf[k] = v - # state_object_schema = None - # if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: - # state_object_schema = StructType.fromJson(json.loads(utf8_deserializer.loads(infile))) - - # NOTE: if timezone is set here, that implies respectSessionTimeZone is True timezone = runner_conf.get("spark.sql.session.timeZone", None) safecheck = ( runner_conf.get("spark.sql.execution.pandas.convertToArrowArraySafely", "false").lower() == "true" ) - df_for_struct = ( - eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF - or eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF - or eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF - ) - # Arrow-optimized Python UDF takes a struct type argument as a Row - struct_in_pandas = ( - "row" if eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF else "dict" - ) - ndarray_as_list = eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF - # Arrow-optimized Python UDF uses explicit Arrow cast for type coercion - arrow_cast = eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF - num_udfs = read_int(infile) udfs = [] @@ -254,13 +177,24 @@ def read_udfs(pickleSer, infile, eval_type): udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i)) def mapper(a): - result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) - # In the special case of a single UDF this will return a single result rather - # than a tuple of results; this is the format that the JVM side expects. - if len(result) == 1: - return result[0] + results = [] + + for (arg_offsets, f, return_type) in udfs: + result = f(*[a[o] for o in arg_offsets]) + if isinstance(return_type, GeometryType): + results.append(( + gpd.GeoSeries(result[0]), + result[1], + )) + + continue + + results.append(result) + + if len(results) == 1: + return results[0] else: - return result + return results def func(_, it): return map(mapper, it) @@ -269,10 +203,6 @@ def read_udfs(pickleSer, infile, eval_type): timezone, safecheck, assign_cols_by_name(runner_conf), - df_for_struct, - struct_in_pandas, - ndarray_as_list, - arrow_cast, ) # profiling is not supported for UDF @@ -493,11 +423,6 @@ if __name__ == "__main__": java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] (sock_file, _) = local_connect_and_auth(java_port, auth_secret) - # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. write_int(os.getpid(), sock_file) sock_file.flush() main(sock_file, sock_file) - - -class GeoArrowLoader: - pass
