This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch arrow-worker in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 323ca112ce0d95f970d7c0f64d907def01074f94 Author: pawelkocinski <[email protected]> AuthorDate: Sun Jul 27 00:29:41 2025 +0200 SEDONA-738 Fix unit tests. --- sedonaworker/worker.py | 126 +++++-------------------------------------------- 1 file changed, 12 insertions(+), 114 deletions(-) diff --git a/sedonaworker/worker.py b/sedonaworker/worker.py index 42fb20beb3..365561f0a6 100644 --- a/sedonaworker/worker.py +++ b/sedonaworker/worker.py @@ -204,8 +204,6 @@ def read_udfs(pickleSer, infile, eval_type): if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: ser = CogroupUDFSerializer(timezone, safecheck, assign_cols_by_name(runner_conf)) - elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF: - ser = ArrowStreamUDFSerializer() else: # Scalar Pandas UDF handles struct type arguments as pandas DataFrames instead of # pandas Series. See SPARK-27240. @@ -300,118 +298,18 @@ def read_udfs(pickleSer, infile, eval_type): # profiling is not supported for UDF return func, None, ser, ser - - def extract_key_value_indexes(grouped_arg_offsets): - """ - Helper function to extract the key and value indexes from arg_offsets for the grouped and - cogrouped pandas udfs. See BasePandasGroupExec.resolveArgOffsets for equivalent scala code. - - Parameters - ---------- - grouped_arg_offsets: list - List containing the key and value indexes of columns of the - DataFrames to be passed to the udf. It consists of n repeating groups where n is the - number of DataFrames. Each group has the following format: - group[0]: length of group - group[1]: length of key indexes - group[2.. group[1] +2]: key attributes - group[group[1] +3 group[0]]: value attributes - """ - parsed = [] - idx = 0 - while idx < len(grouped_arg_offsets): - offsets_len = grouped_arg_offsets[idx] - idx += 1 - offsets = grouped_arg_offsets[idx : idx + offsets_len] - split_index = offsets[0] + 1 - offset_keys = offsets[1:split_index] - offset_values = offsets[split_index:] - parsed.append([offset_keys, offset_values]) - idx += offsets_len - return parsed - - if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: - # We assume there is only one UDF here because grouped map doesn't - # support combining multiple UDFs. - assert num_udfs == 1 - - # See FlatMapGroupsInPandasExec for how arg_offsets are used to - # distinguish between grouping attributes and data attributes - arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) - parsed_offsets = extract_key_value_indexes(arg_offsets) - - # Create function like this: - # mapper a: f([a[0]], [a[0], a[1]]) - def mapper(a): - keys = [a[o] for o in parsed_offsets[0][0]] - vals = [a[o] for o in parsed_offsets[0][1]] - return f(keys, vals) - - elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: - # We assume there is only one UDF here because grouped map doesn't - # support combining multiple UDFs. - assert num_udfs == 1 - - # See FlatMapGroupsInPandas(WithState)Exec for how arg_offsets are used to - # distinguish between grouping attributes and data attributes - arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) - parsed_offsets = extract_key_value_indexes(arg_offsets) - - def mapper(a): - """ - The function receives (iterator of data, state) and performs extraction of key and - value from the data, with retaining lazy evaluation. - - See `load_stream` in `ApplyInPandasWithStateSerializer` for more details on the input - and see `wrap_grouped_map_pandas_udf_with_state` for more details on how output will - be used. - """ - from itertools import tee - - state = a[1] - data_gen = (x[0] for x in a[0]) - - # We know there should be at least one item in the iterator/generator. - # We want to peek the first element to construct the key, hence applying - # tee to construct the key while we retain another iterator/generator - # for values. - keys_gen, values_gen = tee(data_gen) - keys_elem = next(keys_gen) - keys = [keys_elem[o] for o in parsed_offsets[0][0]] - - # This must be generator comprehension - do not materialize. - vals = ([x[o] for o in parsed_offsets[0][1]] for x in values_gen) - - return f(keys, vals, state) - - elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: - # We assume there is only one UDF here because cogrouped map doesn't - # support combining multiple UDFs. - assert num_udfs == 1 - arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) - - parsed_offsets = extract_key_value_indexes(arg_offsets) - - def mapper(a): - df1_keys = [a[0][o] for o in parsed_offsets[0][0]] - df1_vals = [a[0][o] for o in parsed_offsets[0][1]] - df2_keys = [a[1][o] for o in parsed_offsets[1][0]] - df2_vals = [a[1][o] for o in parsed_offsets[1][1]] - return f(df1_keys, df1_vals, df2_keys, df2_vals) - - else: - udfs = [] - for i in range(num_udfs): - udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i)) - - def mapper(a): - result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) - # In the special case of a single UDF this will return a single result rather - # than a tuple of results; this is the format that the JVM side expects. - if len(result) == 1: - return result[0] - else: - return result + udfs = [] + for i in range(num_udfs): + udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i)) + + def mapper(a): + result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) + # In the special case of a single UDF this will return a single result rather + # than a tuple of results; this is the format that the JVM side expects. + if len(result) == 1: + return result[0] + else: + return result def func(_, it): return map(mapper, it)
