andygrove commented on code in PR #1862: URL: https://github.com/apache/datafusion-comet/pull/1862#discussion_r2138821450
########## native/core/src/execution/shuffle/range_partitioner.rs: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, UInt64Array}; +use arrow::compute::{take_arrays, TakeOptions}; +use arrow::row::{Row, RowConverter, Rows, SortField}; +use datafusion::physical_expr::LexOrdering; +use rand::{rngs::SmallRng, Rng, SeedableRng}; + +pub struct RangePartitioner; + +impl RangePartitioner { + /// Given a number of rows, sample size, and a random seed, generates unique indices to take + /// from an input batch to act as a random sample. + /// Adapted from https://en.wikipedia.org/wiki/Reservoir_sampling#Optimal:_Algorithm_L + /// We use sample_size instead of k and num_rows instead of n. + /// We use indices instead of actual values in the reservoir since we'll do one take() on the + /// input arrays at the end. + pub fn reservoir_sample_indices(num_rows: usize, sample_size: usize, seed: u64) -> Vec<u64> { + assert!(sample_size > 0); + assert!( + num_rows > sample_size, + "Sample size > num_rows yields original batch." + ); + + // Initialize our reservoir with indices of the first |sample_size| elements. + let mut reservoir: Vec<u64> = (0..sample_size as u64).collect(); + + let mut rng = SmallRng::seed_from_u64(seed); + let mut w = (rng.random::<f64>().ln() / sample_size as f64).exp(); + let mut i = sample_size - 1; + + while i < num_rows { + i += (rng.random::<f64>().ln() / (1.0 - w).ln()).floor() as usize + 1; + + if i < num_rows { + // Replace a random item in the reservoir with i + let random_index = rng.random_range(0..sample_size); + reservoir[random_index] = i as u64; + w *= (rng.random::<f64>().ln() / sample_size as f64).exp(); + } + } + + reservoir + } + + /// Given a batch of Rows, an ordered vector of Rows that represent partition boundaries, and + /// a slice with enough space for the input batch, determines a partition id for every input + /// Row using binary search. + pub fn partition_indices_for_batch( + row_batch: &Rows, + partition_bounds_vec: &Vec<Row>, + partition_ids: &mut [u32], + ) { + row_batch.iter().enumerate().for_each(|(row_idx, row)| { + partition_ids[row_idx] = + partition_bounds_vec.partition_point(|bound| *bound <= row) as u32 + }); + } + + /// Given input arrays and range partitioning metadata: samples the input arrays, generates + /// partition bounds, and returns Rows (for comparison against) and a RowConverter (for + /// adapting future incoming batches). + pub fn generate_bounds( + partition_arrays: &Vec<ArrayRef>, + lex_ordering: &LexOrdering, + num_output_partitions: usize, + num_rows: usize, + sample_size: usize, + seed: u64, + ) -> (Rows, RowConverter) { Review Comment: We should return a `Result` here and replace the unwraps with `?` in this function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org