rluvaton commented on code in PR #20498:
URL: https://github.com/apache/datafusion/pull/20498#discussion_r2900081202
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -50,43 +62,361 @@ impl ExprPropertiesNode {
}
}
+/// If the mask selects more than this fraction of rows, use
+/// `set_slices()` to copy contiguous ranges. Otherwise iterate
+/// over individual positions using `set_indices()`.
+const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy`
values
/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
+ let mask = match mask.null_count() {
+ 0 => Cow::Borrowed(mask),
+ _ => Cow::Owned(prep_null_mask_filter(mask)),
+ };
+
+ let output_len = mask.len();
+ let count = mask.true_count();
+
+ // Fast path: no true values mean all-null object
+ if count == 0 {
+ return Ok(new_null_array(truthy.data_type(), output_len));
+ }
+
+ // Fast path: all true means output = truthy
+ if count == output_len {
+ return Ok(truthy.slice(0, truthy.len()));
+ }
+
+ let selectivity = count as f64 / output_len as f64;
+ let mask_buffer = mask.values();
+
+ scatter_array(truthy, mask_buffer, output_len, selectivity)
+}
+
+fn scatter_array(
+ truthy: &dyn Array,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Result<ArrayRef> {
+ downcast_primitive_array! {
+ truthy => Ok(Arc::new(scatter_primitive(truthy, mask, output_len,
selectivity))),
+ DataType::Boolean => {
+ Ok(Arc::new(scatter_boolean(truthy.as_boolean(), mask, output_len,
selectivity)))
+ }
+ DataType::Utf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeUtf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::Utf8View => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_string_view(), mask,
output_len, selectivity)))
+ }
+ DataType::Binary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeBinary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::BinaryView => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_binary_view(), mask,
output_len, selectivity)))
+ }
+ DataType::FixedSizeBinary(_) => {
+ Ok(Arc::new(scatter_fixed_size_binary(
+ truthy.as_fixed_size_binary(), mask, output_len, selectivity,
+ )))
+ }
+ DataType::Dictionary(_, _) => {
+ downcast_dictionary_array! {
+ truthy => Ok(Arc::new(scatter_dict(truthy, mask, output_len,
selectivity))),
+ _t => scatter_fallback(truthy, mask, output_len)
+ }
+ }
+ _ => scatter_fallback(truthy, mask, output_len)
+ }
+}
+
+#[inline(never)]
+fn scatter_native<T: ArrowNativeType>(
+ src: &[T],
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut output = vec![T::default(); output_len];
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ let len = end - start;
+ output[start..end].copy_from_slice(&src[src_offset..src_offset +
len]);
+ src_offset += len;
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ output[dst_idx] = src[src_offset];
+ src_offset += 1;
+ }
+ }
+
+ output.into()
+}
+
+fn scatter_bits(
+ src: &BooleanBuffer,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut builder = BooleanBufferBuilder::new(output_len);
+ builder.advance(output_len);
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for i in start..end {
+ if src.value(src_offset) {
+ builder.set_bit(i, true);
+ }
+ src_offset += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ if src.value(src_offset) {
+ builder.set_bit(dst_idx, true);
+ }
+ src_offset += 1;
+ }
+ }
+
+ builder.finish().into_inner()
+}
+
+fn scatter_null_mask(
+ src_nulls: Option<&NullBuffer>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Option<(usize, Buffer)> {
+ let false_count = output_len - mask.count_set_bits();
+ let src_null_count = src_nulls.map(|n| n.null_count()).unwrap_or(0);
+
+ if src_null_count == 0 {
+ if false_count == 0 {
+ None
+ } else {
+ Some((false_count, mask.inner().clone()))
+ }
+ } else {
+ let src_nulls = src_nulls.unwrap();
+ let scattered = scatter_bits(src_nulls.inner(), mask, output_len,
selectivity);
+ let valid_count = scattered.count_set_bits_offset(0, output_len);
+ let null_count = output_len - valid_count;
+ if null_count == 0 {
+ None
+ } else {
+ Some((null_count, scattered))
+ }
+ }
+}
+
+fn scatter_primitive<T: ArrowPrimitiveType>(
+ truthy: &PrimitiveArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> PrimitiveArray<T> {
+ let values = scatter_native(truthy.values(), mask, output_len,
selectivity);
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ PrimitiveArray::from(data)
+}
+
+fn scatter_boolean(
+ truthy: &BooleanArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> BooleanArray {
+ let values = scatter_bits(truthy.values(), mask, output_len, selectivity);
+ let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ BooleanArray::from(data)
+}
+
+fn scatter_bytes<T: ByteArrayType>(
+ truthy: &GenericByteArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteArray<T> {
+ let src_offsets = truthy.value_offsets();
+ let src_data = truthy.value_data();
+
+ // Build output offsets: false positions get zero-length (offset stays
same)
+ let mut dst_offsets: Vec<T::Offset> = Vec::with_capacity(output_len + 1);
+ let mut cur_offset = T::Offset::default();
+ dst_offsets.push(cur_offset);
+
+ let mut src_idx = 0;
+ for i in 0..output_len {
+ if mask.value(i) {
+ let len =
+ src_offsets[src_idx + 1].as_usize() -
src_offsets[src_idx].as_usize();
+ cur_offset += T::Offset::from_usize(len).unwrap();
+ src_idx += 1;
+ }
+ dst_offsets.push(cur_offset);
+ }
+
+ let byte_start = src_offsets[0].as_usize();
+ let byte_end = src_offsets[src_idx].as_usize();
+ let dst_data: Buffer = src_data[byte_start..byte_end].into();
+
+ let offsets_buffer: Buffer = dst_offsets.into();
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(offsets_buffer)
+ .add_buffer(dst_data);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ GenericByteArray::from(data)
+}
+
+fn scatter_byte_view<T: ByteViewType>(
+ truthy: &GenericByteViewArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteViewArray<T> {
+ let new_views = scatter_native(truthy.views(), mask, output_len,
selectivity);
+
+ let mut builder = ArrayDataBuilder::new(T::DATA_TYPE)
+ .len(output_len)
+ .add_buffer(new_views)
+ .add_buffers(truthy.data_buffers().to_vec());
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ GenericByteViewArray::from(unsafe { builder.build_unchecked() })
+}
+
+fn scatter_fixed_size_binary(
+ truthy: &FixedSizeBinaryArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> FixedSizeBinaryArray {
+ let value_length = truthy.value_length() as usize;
+ let mut output = vec![0u8; output_len * value_length];
+ let mut src_idx = 0;
- // update the mask so that any null values become false
- // (SlicesIterator doesn't respect nulls)
- let mask = and_kleene(mask, &is_not_null(mask)?)?;
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for dst_idx in start..end {
+ let src_bytes = truthy.value(src_idx);
+ let dst_start = dst_idx * value_length;
+ output[dst_start..dst_start +
value_length].copy_from_slice(src_bytes);
+ src_idx += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ let src_bytes = truthy.value(src_idx);
+ let dst_start = dst_idx * value_length;
+ output[dst_start..dst_start +
value_length].copy_from_slice(src_bytes);
+ src_idx += 1;
+ }
+ }
- let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(Buffer::from(output));
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ FixedSizeBinaryArray::from(data)
+}
+
+fn scatter_dict<K: ArrowDictionaryKeyType>(
+ truthy: &DictionaryArray<K>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> DictionaryArray<K> {
+ let scattered_keys = scatter_primitive(truthy.keys(), mask, output_len,
selectivity);
+ let builder = scattered_keys
+ .into_data()
+ .into_builder()
+ .data_type(truthy.data_type().clone())
+ .child_data(vec![truthy.values().to_data()]);
+ DictionaryArray::from(unsafe { builder.build_unchecked() })
Review Comment:
Please add safety comment
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -174,7 +502,6 @@ mod tests {
.into_iter()
.collect();
- // output should treat nulls as though they are false
Review Comment:
Why removed?
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -50,43 +62,361 @@ impl ExprPropertiesNode {
}
}
+/// If the mask selects more than this fraction of rows, use
+/// `set_slices()` to copy contiguous ranges. Otherwise iterate
+/// over individual positions using `set_indices()`.
+const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy`
values
/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
+ let mask = match mask.null_count() {
+ 0 => Cow::Borrowed(mask),
+ _ => Cow::Owned(prep_null_mask_filter(mask)),
Review Comment:
Nit: you can have another optimization here if all are nulls, create output
array with the same null buffer and return it
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -142,7 +472,6 @@ mod tests {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
- // the output array is expected to be the same length as the mask array
Review Comment:
Why removed?
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -50,43 +62,361 @@ impl ExprPropertiesNode {
}
}
+/// If the mask selects more than this fraction of rows, use
+/// `set_slices()` to copy contiguous ranges. Otherwise iterate
+/// over individual positions using `set_indices()`.
+const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy`
values
/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
+ let mask = match mask.null_count() {
+ 0 => Cow::Borrowed(mask),
+ _ => Cow::Owned(prep_null_mask_filter(mask)),
+ };
+
+ let output_len = mask.len();
+ let count = mask.true_count();
+
+ // Fast path: no true values mean all-null object
+ if count == 0 {
+ return Ok(new_null_array(truthy.data_type(), output_len));
+ }
+
+ // Fast path: all true means output = truthy
+ if count == output_len {
+ return Ok(truthy.slice(0, truthy.len()));
+ }
+
+ let selectivity = count as f64 / output_len as f64;
+ let mask_buffer = mask.values();
+
+ scatter_array(truthy, mask_buffer, output_len, selectivity)
+}
+
+fn scatter_array(
+ truthy: &dyn Array,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Result<ArrayRef> {
+ downcast_primitive_array! {
+ truthy => Ok(Arc::new(scatter_primitive(truthy, mask, output_len,
selectivity))),
+ DataType::Boolean => {
+ Ok(Arc::new(scatter_boolean(truthy.as_boolean(), mask, output_len,
selectivity)))
+ }
+ DataType::Utf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeUtf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::Utf8View => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_string_view(), mask,
output_len, selectivity)))
+ }
+ DataType::Binary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeBinary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::BinaryView => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_binary_view(), mask,
output_len, selectivity)))
+ }
+ DataType::FixedSizeBinary(_) => {
+ Ok(Arc::new(scatter_fixed_size_binary(
+ truthy.as_fixed_size_binary(), mask, output_len, selectivity,
+ )))
+ }
+ DataType::Dictionary(_, _) => {
+ downcast_dictionary_array! {
+ truthy => Ok(Arc::new(scatter_dict(truthy, mask, output_len,
selectivity))),
+ _t => scatter_fallback(truthy, mask, output_len)
+ }
+ }
+ _ => scatter_fallback(truthy, mask, output_len)
+ }
+}
+
+#[inline(never)]
+fn scatter_native<T: ArrowNativeType>(
+ src: &[T],
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut output = vec![T::default(); output_len];
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ let len = end - start;
+ output[start..end].copy_from_slice(&src[src_offset..src_offset +
len]);
+ src_offset += len;
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ output[dst_idx] = src[src_offset];
+ src_offset += 1;
+ }
+ }
+
+ output.into()
+}
+
+fn scatter_bits(
+ src: &BooleanBuffer,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut builder = BooleanBufferBuilder::new(output_len);
+ builder.advance(output_len);
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for i in start..end {
+ if src.value(src_offset) {
+ builder.set_bit(i, true);
+ }
+ src_offset += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ if src.value(src_offset) {
+ builder.set_bit(dst_idx, true);
+ }
+ src_offset += 1;
+ }
+ }
+
+ builder.finish().into_inner()
+}
+
+fn scatter_null_mask(
+ src_nulls: Option<&NullBuffer>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Option<(usize, Buffer)> {
+ let false_count = output_len - mask.count_set_bits();
+ let src_null_count = src_nulls.map(|n| n.null_count()).unwrap_or(0);
+
+ if src_null_count == 0 {
+ if false_count == 0 {
+ None
+ } else {
+ Some((false_count, mask.inner().clone()))
+ }
+ } else {
+ let src_nulls = src_nulls.unwrap();
+ let scattered = scatter_bits(src_nulls.inner(), mask, output_len,
selectivity);
+ let valid_count = scattered.count_set_bits_offset(0, output_len);
+ let null_count = output_len - valid_count;
+ if null_count == 0 {
+ None
+ } else {
+ Some((null_count, scattered))
+ }
+ }
+}
+
+fn scatter_primitive<T: ArrowPrimitiveType>(
+ truthy: &PrimitiveArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> PrimitiveArray<T> {
+ let values = scatter_native(truthy.values(), mask, output_len,
selectivity);
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ PrimitiveArray::from(data)
+}
+
+fn scatter_boolean(
+ truthy: &BooleanArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> BooleanArray {
+ let values = scatter_bits(truthy.values(), mask, output_len, selectivity);
+ let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ BooleanArray::from(data)
+}
+
+fn scatter_bytes<T: ByteArrayType>(
+ truthy: &GenericByteArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteArray<T> {
+ let src_offsets = truthy.value_offsets();
+ let src_data = truthy.value_data();
+
+ // Build output offsets: false positions get zero-length (offset stays
same)
+ let mut dst_offsets: Vec<T::Offset> = Vec::with_capacity(output_len + 1);
+ let mut cur_offset = T::Offset::default();
+ dst_offsets.push(cur_offset);
+
+ let mut src_idx = 0;
+ for i in 0..output_len {
+ if mask.value(i) {
+ let len =
+ src_offsets[src_idx + 1].as_usize() -
src_offsets[src_idx].as_usize();
+ cur_offset += T::Offset::from_usize(len).unwrap();
+ src_idx += 1;
+ }
+ dst_offsets.push(cur_offset);
+ }
+
+ let byte_start = src_offsets[0].as_usize();
+ let byte_end = src_offsets[src_idx].as_usize();
+ let dst_data: Buffer = src_data[byte_start..byte_end].into();
+
+ let offsets_buffer: Buffer = dst_offsets.into();
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(offsets_buffer)
+ .add_buffer(dst_data);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ GenericByteArray::from(data)
+}
+
+fn scatter_byte_view<T: ByteViewType>(
+ truthy: &GenericByteViewArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteViewArray<T> {
+ let new_views = scatter_native(truthy.views(), mask, output_len,
selectivity);
+
+ let mut builder = ArrayDataBuilder::new(T::DATA_TYPE)
+ .len(output_len)
+ .add_buffer(new_views)
+ .add_buffers(truthy.data_buffers().to_vec());
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ GenericByteViewArray::from(unsafe { builder.build_unchecked() })
Review Comment:
Please add safety comment
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -50,43 +62,361 @@ impl ExprPropertiesNode {
}
}
+/// If the mask selects more than this fraction of rows, use
+/// `set_slices()` to copy contiguous ranges. Otherwise iterate
+/// over individual positions using `set_indices()`.
+const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy`
values
/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
+ let mask = match mask.null_count() {
+ 0 => Cow::Borrowed(mask),
+ _ => Cow::Owned(prep_null_mask_filter(mask)),
+ };
+
+ let output_len = mask.len();
+ let count = mask.true_count();
+
+ // Fast path: no true values mean all-null object
+ if count == 0 {
+ return Ok(new_null_array(truthy.data_type(), output_len));
+ }
+
+ // Fast path: all true means output = truthy
+ if count == output_len {
+ return Ok(truthy.slice(0, truthy.len()));
+ }
+
+ let selectivity = count as f64 / output_len as f64;
+ let mask_buffer = mask.values();
+
+ scatter_array(truthy, mask_buffer, output_len, selectivity)
+}
+
+fn scatter_array(
+ truthy: &dyn Array,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Result<ArrayRef> {
+ downcast_primitive_array! {
+ truthy => Ok(Arc::new(scatter_primitive(truthy, mask, output_len,
selectivity))),
+ DataType::Boolean => {
+ Ok(Arc::new(scatter_boolean(truthy.as_boolean(), mask, output_len,
selectivity)))
+ }
+ DataType::Utf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeUtf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::Utf8View => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_string_view(), mask,
output_len, selectivity)))
+ }
+ DataType::Binary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeBinary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::BinaryView => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_binary_view(), mask,
output_len, selectivity)))
+ }
+ DataType::FixedSizeBinary(_) => {
+ Ok(Arc::new(scatter_fixed_size_binary(
+ truthy.as_fixed_size_binary(), mask, output_len, selectivity,
+ )))
+ }
+ DataType::Dictionary(_, _) => {
+ downcast_dictionary_array! {
+ truthy => Ok(Arc::new(scatter_dict(truthy, mask, output_len,
selectivity))),
+ _t => scatter_fallback(truthy, mask, output_len)
+ }
+ }
+ _ => scatter_fallback(truthy, mask, output_len)
+ }
+}
+
+#[inline(never)]
+fn scatter_native<T: ArrowNativeType>(
+ src: &[T],
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut output = vec![T::default(); output_len];
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ let len = end - start;
+ output[start..end].copy_from_slice(&src[src_offset..src_offset +
len]);
+ src_offset += len;
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ output[dst_idx] = src[src_offset];
+ src_offset += 1;
+ }
+ }
+
+ output.into()
+}
+
+fn scatter_bits(
+ src: &BooleanBuffer,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut builder = BooleanBufferBuilder::new(output_len);
+ builder.advance(output_len);
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for i in start..end {
+ if src.value(src_offset) {
+ builder.set_bit(i, true);
+ }
+ src_offset += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ if src.value(src_offset) {
+ builder.set_bit(dst_idx, true);
+ }
+ src_offset += 1;
+ }
+ }
+
+ builder.finish().into_inner()
+}
+
+fn scatter_null_mask(
+ src_nulls: Option<&NullBuffer>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Option<(usize, Buffer)> {
+ let false_count = output_len - mask.count_set_bits();
+ let src_null_count = src_nulls.map(|n| n.null_count()).unwrap_or(0);
+
+ if src_null_count == 0 {
+ if false_count == 0 {
+ None
+ } else {
+ Some((false_count, mask.inner().clone()))
+ }
+ } else {
+ let src_nulls = src_nulls.unwrap();
+ let scattered = scatter_bits(src_nulls.inner(), mask, output_len,
selectivity);
+ let valid_count = scattered.count_set_bits_offset(0, output_len);
+ let null_count = output_len - valid_count;
+ if null_count == 0 {
+ None
+ } else {
+ Some((null_count, scattered))
+ }
+ }
+}
+
+fn scatter_primitive<T: ArrowPrimitiveType>(
+ truthy: &PrimitiveArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> PrimitiveArray<T> {
+ let values = scatter_native(truthy.values(), mask, output_len,
selectivity);
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ PrimitiveArray::from(data)
+}
+
+fn scatter_boolean(
+ truthy: &BooleanArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> BooleanArray {
+ let values = scatter_bits(truthy.values(), mask, output_len, selectivity);
+ let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ BooleanArray::from(data)
+}
+
+fn scatter_bytes<T: ByteArrayType>(
+ truthy: &GenericByteArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteArray<T> {
+ let src_offsets = truthy.value_offsets();
+ let src_data = truthy.value_data();
+
+ // Build output offsets: false positions get zero-length (offset stays
same)
+ let mut dst_offsets: Vec<T::Offset> = Vec::with_capacity(output_len + 1);
+ let mut cur_offset = T::Offset::default();
+ dst_offsets.push(cur_offset);
+
+ let mut src_idx = 0;
+ for i in 0..output_len {
+ if mask.value(i) {
+ let len =
+ src_offsets[src_idx + 1].as_usize() -
src_offsets[src_idx].as_usize();
+ cur_offset += T::Offset::from_usize(len).unwrap();
+ src_idx += 1;
+ }
+ dst_offsets.push(cur_offset);
+ }
+
+ let byte_start = src_offsets[0].as_usize();
+ let byte_end = src_offsets[src_idx].as_usize();
+ let dst_data: Buffer = src_data[byte_start..byte_end].into();
+
+ let offsets_buffer: Buffer = dst_offsets.into();
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(offsets_buffer)
+ .add_buffer(dst_data);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
Review Comment:
Please add safety comment
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -50,43 +62,361 @@ impl ExprPropertiesNode {
}
}
+/// If the mask selects more than this fraction of rows, use
+/// `set_slices()` to copy contiguous ranges. Otherwise iterate
+/// over individual positions using `set_indices()`.
+const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy`
values
/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
+ let mask = match mask.null_count() {
+ 0 => Cow::Borrowed(mask),
+ _ => Cow::Owned(prep_null_mask_filter(mask)),
+ };
+
+ let output_len = mask.len();
+ let count = mask.true_count();
+
+ // Fast path: no true values mean all-null object
+ if count == 0 {
+ return Ok(new_null_array(truthy.data_type(), output_len));
+ }
+
+ // Fast path: all true means output = truthy
+ if count == output_len {
+ return Ok(truthy.slice(0, truthy.len()));
+ }
+
+ let selectivity = count as f64 / output_len as f64;
+ let mask_buffer = mask.values();
+
+ scatter_array(truthy, mask_buffer, output_len, selectivity)
+}
+
+fn scatter_array(
+ truthy: &dyn Array,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Result<ArrayRef> {
+ downcast_primitive_array! {
+ truthy => Ok(Arc::new(scatter_primitive(truthy, mask, output_len,
selectivity))),
+ DataType::Boolean => {
+ Ok(Arc::new(scatter_boolean(truthy.as_boolean(), mask, output_len,
selectivity)))
+ }
+ DataType::Utf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeUtf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::Utf8View => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_string_view(), mask,
output_len, selectivity)))
+ }
+ DataType::Binary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeBinary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::BinaryView => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_binary_view(), mask,
output_len, selectivity)))
+ }
+ DataType::FixedSizeBinary(_) => {
+ Ok(Arc::new(scatter_fixed_size_binary(
+ truthy.as_fixed_size_binary(), mask, output_len, selectivity,
+ )))
+ }
+ DataType::Dictionary(_, _) => {
+ downcast_dictionary_array! {
+ truthy => Ok(Arc::new(scatter_dict(truthy, mask, output_len,
selectivity))),
+ _t => scatter_fallback(truthy, mask, output_len)
+ }
+ }
+ _ => scatter_fallback(truthy, mask, output_len)
+ }
+}
+
+#[inline(never)]
+fn scatter_native<T: ArrowNativeType>(
+ src: &[T],
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut output = vec![T::default(); output_len];
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ let len = end - start;
+ output[start..end].copy_from_slice(&src[src_offset..src_offset +
len]);
+ src_offset += len;
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ output[dst_idx] = src[src_offset];
+ src_offset += 1;
+ }
+ }
+
+ output.into()
+}
+
+fn scatter_bits(
+ src: &BooleanBuffer,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut builder = BooleanBufferBuilder::new(output_len);
+ builder.advance(output_len);
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for i in start..end {
+ if src.value(src_offset) {
+ builder.set_bit(i, true);
+ }
+ src_offset += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ if src.value(src_offset) {
+ builder.set_bit(dst_idx, true);
+ }
+ src_offset += 1;
+ }
+ }
+
+ builder.finish().into_inner()
+}
+
+fn scatter_null_mask(
+ src_nulls: Option<&NullBuffer>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Option<(usize, Buffer)> {
+ let false_count = output_len - mask.count_set_bits();
+ let src_null_count = src_nulls.map(|n| n.null_count()).unwrap_or(0);
+
+ if src_null_count == 0 {
+ if false_count == 0 {
+ None
+ } else {
+ Some((false_count, mask.inner().clone()))
+ }
+ } else {
+ let src_nulls = src_nulls.unwrap();
+ let scattered = scatter_bits(src_nulls.inner(), mask, output_len,
selectivity);
+ let valid_count = scattered.count_set_bits_offset(0, output_len);
+ let null_count = output_len - valid_count;
+ if null_count == 0 {
+ None
+ } else {
+ Some((null_count, scattered))
+ }
+ }
+}
+
+fn scatter_primitive<T: ArrowPrimitiveType>(
+ truthy: &PrimitiveArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> PrimitiveArray<T> {
+ let values = scatter_native(truthy.values(), mask, output_len,
selectivity);
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ PrimitiveArray::from(data)
+}
+
+fn scatter_boolean(
+ truthy: &BooleanArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> BooleanArray {
+ let values = scatter_bits(truthy.values(), mask, output_len, selectivity);
+ let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ BooleanArray::from(data)
+}
+
+fn scatter_bytes<T: ByteArrayType>(
Review Comment:
FYI, nulls can still have non empty bytes and the previous implementation
that used `MutableArrayData` set for each null an empty bytes which this
implementation did not
For example:
If I have string array:
```
bytes: abcdefgh
offsets: 0, 3, 5, 7, 8
validity: true, false, true, true
```
You see here that null point to `de`
And mask is:
```
true, true, true, false
```
The previous implementation would create the following output:
```
bytes: abcfg
offsets: 0, 3, 3, 5
validity: true, false, true
```
(Null value point to `` (empty))
While your implementation points to `de` as you copy the bytes and the
offset.
Both approaches are ok, just raising a difference
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -157,7 +486,6 @@ mod tests {
let truthy = Arc::new(Int32Array::from(vec![1, 10, 11, 100]));
let mask = BooleanArray::from(vec![true, false, true, false, false,
false]);
- // output should be same length as mask
Review Comment:
Why removed?
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -50,43 +62,361 @@ impl ExprPropertiesNode {
}
}
+/// If the mask selects more than this fraction of rows, use
+/// `set_slices()` to copy contiguous ranges. Otherwise iterate
+/// over individual positions using `set_indices()`.
+const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy`
values
/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
+ let mask = match mask.null_count() {
+ 0 => Cow::Borrowed(mask),
+ _ => Cow::Owned(prep_null_mask_filter(mask)),
+ };
+
+ let output_len = mask.len();
+ let count = mask.true_count();
+
+ // Fast path: no true values mean all-null object
+ if count == 0 {
+ return Ok(new_null_array(truthy.data_type(), output_len));
+ }
+
+ // Fast path: all true means output = truthy
+ if count == output_len {
+ return Ok(truthy.slice(0, truthy.len()));
+ }
+
+ let selectivity = count as f64 / output_len as f64;
+ let mask_buffer = mask.values();
+
+ scatter_array(truthy, mask_buffer, output_len, selectivity)
+}
+
+fn scatter_array(
+ truthy: &dyn Array,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Result<ArrayRef> {
+ downcast_primitive_array! {
+ truthy => Ok(Arc::new(scatter_primitive(truthy, mask, output_len,
selectivity))),
+ DataType::Boolean => {
+ Ok(Arc::new(scatter_boolean(truthy.as_boolean(), mask, output_len,
selectivity)))
+ }
+ DataType::Utf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeUtf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::Utf8View => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_string_view(), mask,
output_len, selectivity)))
+ }
+ DataType::Binary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeBinary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::BinaryView => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_binary_view(), mask,
output_len, selectivity)))
+ }
+ DataType::FixedSizeBinary(_) => {
+ Ok(Arc::new(scatter_fixed_size_binary(
+ truthy.as_fixed_size_binary(), mask, output_len, selectivity,
+ )))
+ }
+ DataType::Dictionary(_, _) => {
+ downcast_dictionary_array! {
+ truthy => Ok(Arc::new(scatter_dict(truthy, mask, output_len,
selectivity))),
+ _t => scatter_fallback(truthy, mask, output_len)
+ }
+ }
+ _ => scatter_fallback(truthy, mask, output_len)
+ }
+}
+
+#[inline(never)]
+fn scatter_native<T: ArrowNativeType>(
+ src: &[T],
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut output = vec![T::default(); output_len];
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ let len = end - start;
+ output[start..end].copy_from_slice(&src[src_offset..src_offset +
len]);
+ src_offset += len;
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ output[dst_idx] = src[src_offset];
+ src_offset += 1;
+ }
+ }
+
+ output.into()
+}
+
+fn scatter_bits(
+ src: &BooleanBuffer,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut builder = BooleanBufferBuilder::new(output_len);
+ builder.advance(output_len);
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for i in start..end {
+ if src.value(src_offset) {
+ builder.set_bit(i, true);
+ }
+ src_offset += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ if src.value(src_offset) {
+ builder.set_bit(dst_idx, true);
+ }
+ src_offset += 1;
+ }
+ }
+
+ builder.finish().into_inner()
+}
+
+fn scatter_null_mask(
+ src_nulls: Option<&NullBuffer>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Option<(usize, Buffer)> {
+ let false_count = output_len - mask.count_set_bits();
+ let src_null_count = src_nulls.map(|n| n.null_count()).unwrap_or(0);
+
+ if src_null_count == 0 {
+ if false_count == 0 {
+ None
+ } else {
+ Some((false_count, mask.inner().clone()))
+ }
+ } else {
+ let src_nulls = src_nulls.unwrap();
+ let scattered = scatter_bits(src_nulls.inner(), mask, output_len,
selectivity);
+ let valid_count = scattered.count_set_bits_offset(0, output_len);
+ let null_count = output_len - valid_count;
+ if null_count == 0 {
+ None
+ } else {
+ Some((null_count, scattered))
+ }
+ }
+}
+
+fn scatter_primitive<T: ArrowPrimitiveType>(
+ truthy: &PrimitiveArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> PrimitiveArray<T> {
+ let values = scatter_native(truthy.values(), mask, output_len,
selectivity);
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ PrimitiveArray::from(data)
+}
+
+fn scatter_boolean(
+ truthy: &BooleanArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> BooleanArray {
+ let values = scatter_bits(truthy.values(), mask, output_len, selectivity);
+ let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ BooleanArray::from(data)
+}
+
+fn scatter_bytes<T: ByteArrayType>(
+ truthy: &GenericByteArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteArray<T> {
+ let src_offsets = truthy.value_offsets();
+ let src_data = truthy.value_data();
+
+ // Build output offsets: false positions get zero-length (offset stays
same)
+ let mut dst_offsets: Vec<T::Offset> = Vec::with_capacity(output_len + 1);
+ let mut cur_offset = T::Offset::default();
+ dst_offsets.push(cur_offset);
+
+ let mut src_idx = 0;
+ for i in 0..output_len {
+ if mask.value(i) {
+ let len =
+ src_offsets[src_idx + 1].as_usize() -
src_offsets[src_idx].as_usize();
+ cur_offset += T::Offset::from_usize(len).unwrap();
+ src_idx += 1;
+ }
+ dst_offsets.push(cur_offset);
+ }
+
+ let byte_start = src_offsets[0].as_usize();
+ let byte_end = src_offsets[src_idx].as_usize();
+ let dst_data: Buffer = src_data[byte_start..byte_end].into();
+
+ let offsets_buffer: Buffer = dst_offsets.into();
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(offsets_buffer)
+ .add_buffer(dst_data);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ GenericByteArray::from(data)
+}
+
+fn scatter_byte_view<T: ByteViewType>(
+ truthy: &GenericByteViewArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteViewArray<T> {
+ let new_views = scatter_native(truthy.views(), mask, output_len,
selectivity);
+
+ let mut builder = ArrayDataBuilder::new(T::DATA_TYPE)
+ .len(output_len)
+ .add_buffer(new_views)
+ .add_buffers(truthy.data_buffers().to_vec());
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ GenericByteViewArray::from(unsafe { builder.build_unchecked() })
+}
+
+fn scatter_fixed_size_binary(
+ truthy: &FixedSizeBinaryArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> FixedSizeBinaryArray {
+ let value_length = truthy.value_length() as usize;
+ let mut output = vec![0u8; output_len * value_length];
+ let mut src_idx = 0;
- // update the mask so that any null values become false
- // (SlicesIterator doesn't respect nulls)
- let mask = and_kleene(mask, &is_not_null(mask)?)?;
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for dst_idx in start..end {
+ let src_bytes = truthy.value(src_idx);
+ let dst_start = dst_idx * value_length;
+ output[dst_start..dst_start +
value_length].copy_from_slice(src_bytes);
+ src_idx += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ let src_bytes = truthy.value(src_idx);
+ let dst_start = dst_idx * value_length;
+ output[dst_start..dst_start +
value_length].copy_from_slice(src_bytes);
+ src_idx += 1;
+ }
+ }
- let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(Buffer::from(output));
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
Review Comment:
Please add safety comment
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -188,7 +515,6 @@ mod tests {
let truthy = Arc::new(BooleanArray::from(vec![false, false, false,
true]));
let mask = BooleanArray::from(vec![true, true, false, false, true]);
- // the output array is expected to be the same length as the mask array
Review Comment:
Why removed?
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -50,43 +62,361 @@ impl ExprPropertiesNode {
}
}
+/// If the mask selects more than this fraction of rows, use
+/// `set_slices()` to copy contiguous ranges. Otherwise iterate
+/// over individual positions using `set_indices()`.
+const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy`
values
/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
+ let mask = match mask.null_count() {
+ 0 => Cow::Borrowed(mask),
+ _ => Cow::Owned(prep_null_mask_filter(mask)),
+ };
+
+ let output_len = mask.len();
+ let count = mask.true_count();
+
+ // Fast path: no true values mean all-null object
+ if count == 0 {
+ return Ok(new_null_array(truthy.data_type(), output_len));
+ }
+
+ // Fast path: all true means output = truthy
+ if count == output_len {
+ return Ok(truthy.slice(0, truthy.len()));
+ }
+
+ let selectivity = count as f64 / output_len as f64;
+ let mask_buffer = mask.values();
+
+ scatter_array(truthy, mask_buffer, output_len, selectivity)
+}
+
+fn scatter_array(
+ truthy: &dyn Array,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Result<ArrayRef> {
+ downcast_primitive_array! {
+ truthy => Ok(Arc::new(scatter_primitive(truthy, mask, output_len,
selectivity))),
+ DataType::Boolean => {
+ Ok(Arc::new(scatter_boolean(truthy.as_boolean(), mask, output_len,
selectivity)))
+ }
+ DataType::Utf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeUtf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::Utf8View => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_string_view(), mask,
output_len, selectivity)))
+ }
+ DataType::Binary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeBinary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::BinaryView => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_binary_view(), mask,
output_len, selectivity)))
+ }
+ DataType::FixedSizeBinary(_) => {
+ Ok(Arc::new(scatter_fixed_size_binary(
+ truthy.as_fixed_size_binary(), mask, output_len, selectivity,
+ )))
+ }
+ DataType::Dictionary(_, _) => {
+ downcast_dictionary_array! {
+ truthy => Ok(Arc::new(scatter_dict(truthy, mask, output_len,
selectivity))),
+ _t => scatter_fallback(truthy, mask, output_len)
+ }
+ }
+ _ => scatter_fallback(truthy, mask, output_len)
+ }
+}
+
+#[inline(never)]
+fn scatter_native<T: ArrowNativeType>(
+ src: &[T],
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut output = vec![T::default(); output_len];
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ let len = end - start;
+ output[start..end].copy_from_slice(&src[src_offset..src_offset +
len]);
+ src_offset += len;
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ output[dst_idx] = src[src_offset];
+ src_offset += 1;
+ }
+ }
+
+ output.into()
+}
+
+fn scatter_bits(
+ src: &BooleanBuffer,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut builder = BooleanBufferBuilder::new(output_len);
+ builder.advance(output_len);
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for i in start..end {
+ if src.value(src_offset) {
+ builder.set_bit(i, true);
+ }
+ src_offset += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ if src.value(src_offset) {
+ builder.set_bit(dst_idx, true);
+ }
+ src_offset += 1;
+ }
+ }
+
+ builder.finish().into_inner()
+}
+
+fn scatter_null_mask(
+ src_nulls: Option<&NullBuffer>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Option<(usize, Buffer)> {
+ let false_count = output_len - mask.count_set_bits();
+ let src_null_count = src_nulls.map(|n| n.null_count()).unwrap_or(0);
+
+ if src_null_count == 0 {
+ if false_count == 0 {
+ None
+ } else {
+ Some((false_count, mask.inner().clone()))
+ }
+ } else {
+ let src_nulls = src_nulls.unwrap();
+ let scattered = scatter_bits(src_nulls.inner(), mask, output_len,
selectivity);
+ let valid_count = scattered.count_set_bits_offset(0, output_len);
+ let null_count = output_len - valid_count;
+ if null_count == 0 {
+ None
+ } else {
+ Some((null_count, scattered))
+ }
+ }
+}
+
+fn scatter_primitive<T: ArrowPrimitiveType>(
+ truthy: &PrimitiveArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> PrimitiveArray<T> {
+ let values = scatter_native(truthy.values(), mask, output_len,
selectivity);
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ PrimitiveArray::from(data)
+}
+
+fn scatter_boolean(
+ truthy: &BooleanArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> BooleanArray {
+ let values = scatter_bits(truthy.values(), mask, output_len, selectivity);
+ let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ BooleanArray::from(data)
+}
+
+fn scatter_bytes<T: ByteArrayType>(
+ truthy: &GenericByteArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteArray<T> {
+ let src_offsets = truthy.value_offsets();
+ let src_data = truthy.value_data();
+
+ // Build output offsets: false positions get zero-length (offset stays
same)
+ let mut dst_offsets: Vec<T::Offset> = Vec::with_capacity(output_len + 1);
+ let mut cur_offset = T::Offset::default();
+ dst_offsets.push(cur_offset);
+
+ let mut src_idx = 0;
+ for i in 0..output_len {
+ if mask.value(i) {
+ let len =
+ src_offsets[src_idx + 1].as_usize() -
src_offsets[src_idx].as_usize();
+ cur_offset += T::Offset::from_usize(len).unwrap();
+ src_idx += 1;
+ }
+ dst_offsets.push(cur_offset);
+ }
+
+ let byte_start = src_offsets[0].as_usize();
+ let byte_end = src_offsets[src_idx].as_usize();
+ let dst_data: Buffer = src_data[byte_start..byte_end].into();
+
+ let offsets_buffer: Buffer = dst_offsets.into();
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(offsets_buffer)
+ .add_buffer(dst_data);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ GenericByteArray::from(data)
+}
+
+fn scatter_byte_view<T: ByteViewType>(
+ truthy: &GenericByteViewArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteViewArray<T> {
+ let new_views = scatter_native(truthy.views(), mask, output_len,
selectivity);
+
+ let mut builder = ArrayDataBuilder::new(T::DATA_TYPE)
+ .len(output_len)
+ .add_buffer(new_views)
+ .add_buffers(truthy.data_buffers().to_vec());
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ GenericByteViewArray::from(unsafe { builder.build_unchecked() })
+}
+
+fn scatter_fixed_size_binary(
+ truthy: &FixedSizeBinaryArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> FixedSizeBinaryArray {
+ let value_length = truthy.value_length() as usize;
+ let mut output = vec![0u8; output_len * value_length];
+ let mut src_idx = 0;
- // update the mask so that any null values become false
- // (SlicesIterator doesn't respect nulls)
- let mask = and_kleene(mask, &is_not_null(mask)?)?;
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for dst_idx in start..end {
+ let src_bytes = truthy.value(src_idx);
+ let dst_start = dst_idx * value_length;
+ output[dst_start..dst_start +
value_length].copy_from_slice(src_bytes);
+ src_idx += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ let src_bytes = truthy.value(src_idx);
+ let dst_start = dst_idx * value_length;
+ output[dst_start..dst_start +
value_length].copy_from_slice(src_bytes);
+ src_idx += 1;
+ }
+ }
- let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(Buffer::from(output));
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ FixedSizeBinaryArray::from(data)
+}
+
+fn scatter_dict<K: ArrowDictionaryKeyType>(
+ truthy: &DictionaryArray<K>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> DictionaryArray<K> {
+ let scattered_keys = scatter_primitive(truthy.keys(), mask, output_len,
selectivity);
+ let builder = scattered_keys
+ .into_data()
+ .into_builder()
+ .data_type(truthy.data_type().clone())
+ .child_data(vec![truthy.values().to_data()]);
Review Comment:
I don't think you can and should do that, you work on the keys and convert
it to dictionary which seems buggy.
All you want is to replace the keys in the dictionary, you can do that with
the Dictionary new function
##########
datafusion/physical-expr-common/src/utils.rs:
##########
@@ -50,43 +62,361 @@ impl ExprPropertiesNode {
}
}
+/// If the mask selects more than this fraction of rows, use
+/// `set_slices()` to copy contiguous ranges. Otherwise iterate
+/// over individual positions using `set_indices()`.
+const SCATTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
+
/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
/// are taken, when the mask evaluates `false` values null values are filled.
///
/// # Arguments
/// * `mask` - Boolean values used to determine where to put the `truthy`
values
/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
- let truthy = truthy.to_data();
+ let mask = match mask.null_count() {
+ 0 => Cow::Borrowed(mask),
+ _ => Cow::Owned(prep_null_mask_filter(mask)),
+ };
+
+ let output_len = mask.len();
+ let count = mask.true_count();
+
+ // Fast path: no true values mean all-null object
+ if count == 0 {
+ return Ok(new_null_array(truthy.data_type(), output_len));
+ }
+
+ // Fast path: all true means output = truthy
+ if count == output_len {
+ return Ok(truthy.slice(0, truthy.len()));
+ }
+
+ let selectivity = count as f64 / output_len as f64;
+ let mask_buffer = mask.values();
+
+ scatter_array(truthy, mask_buffer, output_len, selectivity)
+}
+
+fn scatter_array(
+ truthy: &dyn Array,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Result<ArrayRef> {
+ downcast_primitive_array! {
+ truthy => Ok(Arc::new(scatter_primitive(truthy, mask, output_len,
selectivity))),
+ DataType::Boolean => {
+ Ok(Arc::new(scatter_boolean(truthy.as_boolean(), mask, output_len,
selectivity)))
+ }
+ DataType::Utf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeUtf8 => {
+ Ok(Arc::new(scatter_bytes(truthy.as_string::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::Utf8View => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_string_view(), mask,
output_len, selectivity)))
+ }
+ DataType::Binary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i32>(), mask,
output_len, selectivity)))
+ }
+ DataType::LargeBinary => {
+ Ok(Arc::new(scatter_bytes(truthy.as_binary::<i64>(), mask,
output_len, selectivity)))
+ }
+ DataType::BinaryView => {
+ Ok(Arc::new(scatter_byte_view(truthy.as_binary_view(), mask,
output_len, selectivity)))
+ }
+ DataType::FixedSizeBinary(_) => {
+ Ok(Arc::new(scatter_fixed_size_binary(
+ truthy.as_fixed_size_binary(), mask, output_len, selectivity,
+ )))
+ }
+ DataType::Dictionary(_, _) => {
+ downcast_dictionary_array! {
+ truthy => Ok(Arc::new(scatter_dict(truthy, mask, output_len,
selectivity))),
+ _t => scatter_fallback(truthy, mask, output_len)
+ }
+ }
+ _ => scatter_fallback(truthy, mask, output_len)
+ }
+}
+
+#[inline(never)]
+fn scatter_native<T: ArrowNativeType>(
+ src: &[T],
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut output = vec![T::default(); output_len];
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ let len = end - start;
+ output[start..end].copy_from_slice(&src[src_offset..src_offset +
len]);
+ src_offset += len;
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ output[dst_idx] = src[src_offset];
+ src_offset += 1;
+ }
+ }
+
+ output.into()
+}
+
+fn scatter_bits(
+ src: &BooleanBuffer,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Buffer {
+ let mut builder = BooleanBufferBuilder::new(output_len);
+ builder.advance(output_len);
+ let mut src_offset = 0;
+
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for i in start..end {
+ if src.value(src_offset) {
+ builder.set_bit(i, true);
+ }
+ src_offset += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ if src.value(src_offset) {
+ builder.set_bit(dst_idx, true);
+ }
+ src_offset += 1;
+ }
+ }
+
+ builder.finish().into_inner()
+}
+
+fn scatter_null_mask(
+ src_nulls: Option<&NullBuffer>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> Option<(usize, Buffer)> {
+ let false_count = output_len - mask.count_set_bits();
+ let src_null_count = src_nulls.map(|n| n.null_count()).unwrap_or(0);
+
+ if src_null_count == 0 {
+ if false_count == 0 {
+ None
+ } else {
+ Some((false_count, mask.inner().clone()))
+ }
+ } else {
+ let src_nulls = src_nulls.unwrap();
+ let scattered = scatter_bits(src_nulls.inner(), mask, output_len,
selectivity);
+ let valid_count = scattered.count_set_bits_offset(0, output_len);
+ let null_count = output_len - valid_count;
+ if null_count == 0 {
+ None
+ } else {
+ Some((null_count, scattered))
+ }
+ }
+}
+
+fn scatter_primitive<T: ArrowPrimitiveType>(
+ truthy: &PrimitiveArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> PrimitiveArray<T> {
+ let values = scatter_native(truthy.values(), mask, output_len,
selectivity);
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ PrimitiveArray::from(data)
+}
+
+fn scatter_boolean(
+ truthy: &BooleanArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> BooleanArray {
+ let values = scatter_bits(truthy.values(), mask, output_len, selectivity);
+ let mut builder = ArrayDataBuilder::new(DataType::Boolean)
+ .len(output_len)
+ .add_buffer(values);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ BooleanArray::from(data)
+}
+
+fn scatter_bytes<T: ByteArrayType>(
+ truthy: &GenericByteArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteArray<T> {
+ let src_offsets = truthy.value_offsets();
+ let src_data = truthy.value_data();
+
+ // Build output offsets: false positions get zero-length (offset stays
same)
+ let mut dst_offsets: Vec<T::Offset> = Vec::with_capacity(output_len + 1);
+ let mut cur_offset = T::Offset::default();
+ dst_offsets.push(cur_offset);
+
+ let mut src_idx = 0;
+ for i in 0..output_len {
+ if mask.value(i) {
+ let len =
+ src_offsets[src_idx + 1].as_usize() -
src_offsets[src_idx].as_usize();
+ cur_offset += T::Offset::from_usize(len).unwrap();
+ src_idx += 1;
+ }
+ dst_offsets.push(cur_offset);
+ }
+
+ let byte_start = src_offsets[0].as_usize();
+ let byte_end = src_offsets[src_idx].as_usize();
+ let dst_data: Buffer = src_data[byte_start..byte_end].into();
+
+ let offsets_buffer: Buffer = dst_offsets.into();
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(offsets_buffer)
+ .add_buffer(dst_data);
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ GenericByteArray::from(data)
+}
+
+fn scatter_byte_view<T: ByteViewType>(
+ truthy: &GenericByteViewArray<T>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> GenericByteViewArray<T> {
+ let new_views = scatter_native(truthy.views(), mask, output_len,
selectivity);
+
+ let mut builder = ArrayDataBuilder::new(T::DATA_TYPE)
+ .len(output_len)
+ .add_buffer(new_views)
+ .add_buffers(truthy.data_buffers().to_vec());
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ GenericByteViewArray::from(unsafe { builder.build_unchecked() })
+}
+
+fn scatter_fixed_size_binary(
+ truthy: &FixedSizeBinaryArray,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> FixedSizeBinaryArray {
+ let value_length = truthy.value_length() as usize;
+ let mut output = vec![0u8; output_len * value_length];
+ let mut src_idx = 0;
- // update the mask so that any null values become false
- // (SlicesIterator doesn't respect nulls)
- let mask = and_kleene(mask, &is_not_null(mask)?)?;
+ if selectivity > SCATTER_SLICES_SELECTIVITY_THRESHOLD {
+ for (start, end) in mask.set_slices() {
+ for dst_idx in start..end {
+ let src_bytes = truthy.value(src_idx);
+ let dst_start = dst_idx * value_length;
+ output[dst_start..dst_start +
value_length].copy_from_slice(src_bytes);
+ src_idx += 1;
+ }
+ }
+ } else {
+ for dst_idx in mask.set_indices() {
+ let src_bytes = truthy.value(src_idx);
+ let dst_start = dst_idx * value_length;
+ output[dst_start..dst_start +
value_length].copy_from_slice(src_bytes);
+ src_idx += 1;
+ }
+ }
- let mut mutable = MutableArrayData::new(vec![&truthy], true, mask.len());
+ let mut builder = ArrayDataBuilder::new(truthy.data_type().clone())
+ .len(output_len)
+ .add_buffer(Buffer::from(output));
+
+ if let Some((null_count, nulls)) =
+ scatter_null_mask(truthy.nulls(), mask, output_len, selectivity)
+ {
+ builder = builder.null_count(null_count).null_bit_buffer(Some(nulls));
+ }
+
+ let data = unsafe { builder.build_unchecked() };
+ FixedSizeBinaryArray::from(data)
+}
+
+fn scatter_dict<K: ArrowDictionaryKeyType>(
+ truthy: &DictionaryArray<K>,
+ mask: &BooleanBuffer,
+ output_len: usize,
+ selectivity: f64,
+) -> DictionaryArray<K> {
+ let scattered_keys = scatter_primitive(truthy.keys(), mask, output_len,
selectivity);
+ let builder = scattered_keys
+ .into_data()
+ .into_builder()
+ .data_type(truthy.data_type().clone())
+ .child_data(vec![truthy.values().to_data()]);
+ DictionaryArray::from(unsafe { builder.build_unchecked() })
+}
- // the SlicesIterator slices only the true values. So the gaps left by
this iterator we need to
- // fill with falsy values
+fn scatter_fallback(
+ truthy: &dyn Array,
+ mask: &BooleanBuffer,
+ output_len: usize,
+) -> Result<ArrayRef> {
+ let truthy_data = truthy.to_data();
+ let mut mutable = MutableArrayData::new(vec![&truthy_data], true,
output_len);
- // keep track of how much is filled
let mut filled = 0;
- // keep track of current position we have in truthy array
let mut true_pos = 0;
- SlicesIterator::new(&mask).for_each(|(start, end)| {
- // the gap needs to be filled with nulls
+ let mask_array = BooleanArray::new(mask.clone(), None);
+ SlicesIterator::new(&mask_array).for_each(|(start, end)| {
if start > filled {
mutable.extend_nulls(start - filled);
}
- // fill with truthy values
let len = end - start;
mutable.extend(0, true_pos, true_pos + len);
true_pos += len;
filled = end;
});
- // the remaining part is falsy
- if filled < mask.len() {
- mutable.extend_nulls(mask.len() - filled);
+
+ if filled < output_len {
+ mutable.extend_nulls(output_len - filled);
}
Review Comment:
Why removed comments?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]