andygrove commented on code in PR #1862:
URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138821947


##########
native/core/src/execution/shuffle/range_partitioner.rs:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, UInt64Array};
+use arrow::compute::{take_arrays, TakeOptions};
+use arrow::row::{Row, RowConverter, Rows, SortField};
+use datafusion::physical_expr::LexOrdering;
+use rand::{rngs::SmallRng, Rng, SeedableRng};
+
+pub struct RangePartitioner;
+
+impl RangePartitioner {
+    /// Given a number of rows, sample size, and a random seed, generates 
unique indices to take
+    /// from an input batch to act as a random sample.
+    /// Adapted from 
https://en.wikipedia.org/wiki/Reservoir_sampling#Optimal:_Algorithm_L
+    /// We use sample_size instead of k and num_rows instead of n.
+    /// We use indices instead of actual values in the reservoir  since we'll 
do one take() on the
+    /// input arrays at the end.
+    pub fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed: 
u64) -> Vec<u64> {
+        assert!(sample_size > 0);
+        assert!(
+            num_rows > sample_size,
+            "Sample size > num_rows yields original batch."
+        );
+
+        // Initialize our reservoir with indices of the first |sample_size| 
elements.
+        let mut reservoir: Vec<u64> = (0..sample_size as u64).collect();
+
+        let mut rng = SmallRng::seed_from_u64(seed);
+        let mut w = (rng.random::<f64>().ln() / sample_size as f64).exp();
+        let mut i = sample_size - 1;
+
+        while i < num_rows {
+            i += (rng.random::<f64>().ln() / (1.0 - w).ln()).floor() as usize 
+ 1;
+
+            if i < num_rows {
+                // Replace a random item in the reservoir with i
+                let random_index = rng.random_range(0..sample_size);
+                reservoir[random_index] = i as u64;
+                w *= (rng.random::<f64>().ln() / sample_size as f64).exp();
+            }
+        }
+
+        reservoir
+    }
+
+    /// Given a batch of Rows, an ordered vector of Rows that represent 
partition boundaries, and
+    /// a slice with enough space for the input batch, determines a partition 
id for every input
+    /// Row using binary search.
+    pub fn partition_indices_for_batch(
+        row_batch: &Rows,
+        partition_bounds_vec: &Vec<Row>,
+        partition_ids: &mut [u32],
+    ) {
+        row_batch.iter().enumerate().for_each(|(row_idx, row)| {
+            partition_ids[row_idx] =
+                partition_bounds_vec.partition_point(|bound| *bound <= row) as 
u32
+        });
+    }
+
+    /// Given input arrays and range partitioning metadata: samples the input 
arrays, generates
+    /// partition bounds, and returns Rows (for comparison against) and a 
RowConverter (for
+    /// adapting future incoming batches).
+    pub fn generate_bounds(
+        partition_arrays: &Vec<ArrayRef>,
+        lex_ordering: &LexOrdering,
+        num_output_partitions: usize,
+        num_rows: usize,
+        sample_size: usize,
+        seed: u64,
+    ) -> (Rows, RowConverter) {
+        let sampled_columns = if sample_size < num_rows {
+            // Construct our sample indices.
+            let sample_indices = 
UInt64Array::from(RangePartitioner::reservoir_sample_indices(
+                num_rows,
+                sample_size,
+                seed,
+            ));
+
+            // Extract our sampled data from the input data.
+            take_arrays(
+                partition_arrays,
+                &sample_indices,
+                Some(TakeOptions {
+                    check_bounds: false,
+                }),
+            )
+            .unwrap()
+        } else {
+            // Requested sample_size is larger than the batch, so just use the 
batch.
+            partition_arrays.clone()
+        };
+
+        // Generate our bounds indices.
+        let sort_fields: Vec<SortField> = partition_arrays
+            .iter()
+            .zip(lex_ordering)
+            .map(|(array, sort_expr)| {
+                SortField::new_with_options(array.data_type().clone(), 
sort_expr.options)
+            })
+            .collect();
+
+        let (bounds_indices, row_converter) = 
RangePartitioner::determine_bounds_for_rows(
+            sort_fields,
+            sampled_columns.as_slice(),
+            num_output_partitions,
+        );
+
+        // Extract our bounds data from the sampled data.
+        let bounds_indices_array = UInt64Array::from(bounds_indices);
+        let bounds_arrays = take_arrays(
+            sampled_columns.as_slice(),
+            &bounds_indices_array,
+            Some(TakeOptions {
+                check_bounds: false,
+            }),
+        )
+        .unwrap();
+
+        // Convert the bounds data to Rows and return with RowConverter.
+        (
+            row_converter
+                .convert_columns(bounds_arrays.as_slice())
+                .unwrap(),
+            row_converter,
+        )
+    }
+
+    /// Given a sort ordering, sampled data, and a number of target 
partitions, finds the partition
+    /// bounds and returns them as indices into the sampled data.
+    /// Adapted from org.apache.spark.RangePartitioner.determineBounds but 
without weighted
+    /// values since we don't have cross-partition samples to merge.
+    pub fn determine_bounds_for_rows(
+        sort_fields: Vec<SortField>,
+        sampled_columns: &[ArrayRef],
+        partitions: usize,
+    ) -> (Vec<u64>, RowConverter) {

Review Comment:
   Same comment as above re error handling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to