This is an automated email from the ASF dual-hosted git repository.
kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new f251ab4a chore(rust/sedona-spatial-join): Evaluate spatial predicate
operands in EvaluateOperandBatchStream (#521)
f251ab4a is described below
commit f251ab4aedddc95014c33180635ac888659895ee
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Fri Jan 16 22:57:52 2026 +0800
chore(rust/sedona-spatial-join): Evaluate spatial predicate operands in
EvaluateOperandBatchStream (#521)
This refactor makes `BuildSideBatchesCollector ` and `SpatialJoinStream`
work with streams of `EvaluatedBatch`es instead of directly with streams of
`RecordBatch`es. We will have `EvaluateBatch`es read directly by spill readers
so adding this layer of abstraction make the main part of spatial join care
less about whether the stream is directly from the source of read from spill
files.
The stream for the build-side automatically compact batches to avoid
holding large sparse binary view arrays in memory. The
`EvaluateOperandBatchStream` performs the batch compaction automatically before
evaluating the operands of spatial predicates.
---
rust/sedona-spatial-join/src/build_index.rs | 21 +-
.../src/evaluated_batch/evaluated_batch_stream.rs | 8 +
.../evaluated_batch_stream/evaluate.rs | 217 ++++++++++++++
.../evaluated_batch_stream/in_mem.rs | 10 +-
.../src/index/build_side_collector.rs | 70 ++++-
rust/sedona-spatial-join/src/stream.rs | 27 +-
rust/sedona-spatial-join/src/utils/arrow_utils.rs | 318 ++++++++++++++++++++-
7 files changed, 620 insertions(+), 51 deletions(-)
diff --git a/rust/sedona-spatial-join/src/build_index.rs
b/rust/sedona-spatial-join/src/build_index.rs
index 3600d294..f369365c 100644
--- a/rust/sedona-spatial-join/src/build_index.rs
+++ b/rust/sedona-spatial-join/src/build_index.rs
@@ -71,24 +71,9 @@ pub async fn build_index(
collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics));
}
- let build_partitions = if concurrent {
- // Collect partitions concurrently using collect_all which spawns tasks
- collector
- .collect_all(build_streams, reservations, collect_metrics_vec)
- .await?
- } else {
- // Collect partitions sequentially (for JNI/embedded contexts)
- let mut partitions = Vec::with_capacity(num_partitions);
- for ((stream, reservation), metrics) in build_streams
- .into_iter()
- .zip(reservations)
- .zip(&collect_metrics_vec)
- {
- let partition = collector.collect(stream, reservation,
metrics).await?;
- partitions.push(partition);
- }
- partitions
- };
+ let build_partitions = collector
+ .collect_all(build_streams, reservations, collect_metrics_vec,
concurrent)
+ .await?;
let contains_external_stream = build_partitions
.iter()
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
index 958087f7..eb1f855c 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
@@ -17,6 +17,7 @@
use std::pin::Pin;
+use arrow_schema::SchemaRef;
use futures::Stream;
use crate::evaluated_batch::EvaluatedBatch;
@@ -27,8 +28,15 @@ use datafusion_common::Result;
pub(crate) trait EvaluatedBatchStream: Stream<Item = Result<EvaluatedBatch>> {
/// Returns true if this stream is an external stream, where batch data
were spilled to disk.
fn is_external(&self) -> bool;
+
+ /// Returns the schema of records produced by this `EvaluatedBatchStream`.
+ ///
+ /// Implementation of this trait should guarantee that all
`EvaluatedBatch`'s returned by this
+ /// stream should have the same schema as returned from this method.
+ fn schema(&self) -> SchemaRef;
}
pub(crate) type SendableEvaluatedBatchStream = Pin<Box<dyn
EvaluatedBatchStream + Send>>;
+pub(crate) mod evaluate;
pub(crate) mod in_mem;
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
new file mode 100644
index 00000000..0baf3c5f
--- /dev/null
+++
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow_array::RecordBatch;
+use arrow_schema::{DataType, SchemaRef};
+use datafusion_common::Result;
+use datafusion_physical_plan::{metrics, SendableRecordBatchStream};
+use futures::{Stream, StreamExt};
+
+use crate::evaluated_batch::{
+ evaluated_batch_stream::{EvaluatedBatchStream,
SendableEvaluatedBatchStream},
+ EvaluatedBatch,
+};
+use crate::operand_evaluator::{EvaluatedGeometryArray, OperandEvaluator};
+use crate::utils::arrow_utils::compact_batch;
+
+/// An evaluator that can evaluate geometry expressions on record batches
+/// and produces evaluated geometry arrays.
+trait Evaluator: Unpin {
+ fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray>;
+}
+
+/// An evaluator for build-side geometry expressions.
+struct BuildSideEvaluator {
+ evaluator: Arc<dyn OperandEvaluator>,
+}
+
+impl Evaluator for BuildSideEvaluator {
+ fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
+ self.evaluator.evaluate_build(batch)
+ }
+}
+
+/// An evaluator for probe-side geometry expressions.
+struct ProbeSideEvaluator {
+ evaluator: Arc<dyn OperandEvaluator>,
+}
+
+impl Evaluator for ProbeSideEvaluator {
+ fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
+ self.evaluator.evaluate_probe(batch)
+ }
+}
+
+/// Wraps a `SendableRecordBatchStream` and evaluates the probe-side geometry
+/// expression eagerly so downstream consumers can operate on
`EvaluatedBatch`s.
+struct EvaluateOperandBatchStream<E: Evaluator> {
+ inner: SendableRecordBatchStream,
+ evaluator: E,
+ evaluation_time: metrics::Time,
+ gc_view_arrays: bool,
+}
+
+impl<E: Evaluator> EvaluateOperandBatchStream<E> {
+ fn new(
+ inner: SendableRecordBatchStream,
+ evaluator: E,
+ evaluation_time: metrics::Time,
+ gc_view_arrays: bool,
+ ) -> Self {
+ let gc_view_arrays = gc_view_arrays &&
schema_contains_view_types(&inner.schema());
+ Self {
+ inner,
+ evaluator,
+ evaluation_time,
+ gc_view_arrays,
+ }
+ }
+}
+
+/// Checks if the schema contains any view types (Utf8View or BinaryView).
+fn schema_contains_view_types(schema: &SchemaRef) -> bool {
+ schema
+ .flattened_fields()
+ .iter()
+ .any(|field| matches!(field.data_type(), DataType::Utf8View |
DataType::BinaryView))
+}
+
+impl<E: Evaluator> EvaluatedBatchStream for EvaluateOperandBatchStream<E> {
+ fn is_external(&self) -> bool {
+ false
+ }
+
+ fn schema(&self) -> arrow_schema::SchemaRef {
+ self.inner.schema()
+ }
+}
+
+impl<E: Evaluator> Stream for EvaluateOperandBatchStream<E> {
+ type Item = Result<EvaluatedBatch>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ let self_mut = self.get_mut();
+ match self_mut.inner.poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(batch))) => {
+ let _timer = self_mut.evaluation_time.timer();
+ let batch = if self_mut.gc_view_arrays {
+ compact_batch(batch)?
+ } else {
+ batch
+ };
+ let geom_array = self_mut.evaluator.evaluate(&batch)?;
+ let evaluated = EvaluatedBatch { batch, geom_array };
+ Poll::Ready(Some(Ok(evaluated)))
+ }
+ Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
+ Poll::Ready(None) => Poll::Ready(None),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
+/// Returns a `SendableEvaluatedBatchStream` that eagerly evaluates the
build-side
+/// geometry expression for every incoming `RecordBatch`.
+pub(crate) fn create_evaluated_build_stream(
+ stream: SendableRecordBatchStream,
+ evaluator: Arc<dyn OperandEvaluator>,
+ evaluation_time: metrics::Time,
+) -> SendableEvaluatedBatchStream {
+ // Enable gc_view_arrays for build-side since build-side batches needs to
be long-lived
+ // in memory during the join process. Poorly managed sparse view arrays
could lead to
+ // unnecessary high memory usage or excessive spilling.
+ Box::pin(EvaluateOperandBatchStream::new(
+ stream,
+ BuildSideEvaluator { evaluator },
+ evaluation_time,
+ true,
+ ))
+}
+
+/// Returns a `SendableEvaluatedBatchStream` that eagerly evaluates the
probe-side
+/// geometry expression for every incoming `RecordBatch`.
+pub(crate) fn create_evaluated_probe_stream(
+ stream: SendableRecordBatchStream,
+ evaluator: Arc<dyn OperandEvaluator>,
+ evaluation_time: metrics::Time,
+) -> SendableEvaluatedBatchStream {
+ Box::pin(EvaluateOperandBatchStream::new(
+ stream,
+ ProbeSideEvaluator { evaluator },
+ evaluation_time,
+ false,
+ ))
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
+
+ use super::schema_contains_view_types;
+
+ fn schema(fields: Vec<Field>) -> SchemaRef {
+ Arc::new(Schema::new(fields))
+ }
+
+ #[test]
+ fn test_schema_contains_view_types_top_level() {
+ let schema_ref = schema(vec![
+ Field::new("a", DataType::Utf8View, true),
+ Field::new("b", DataType::BinaryView, true),
+ ]);
+
+ assert!(schema_contains_view_types(&schema_ref));
+
+ // Similar shape but without view types
+ let schema_no_view = schema(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Binary, true),
+ ]);
+ assert!(!schema_contains_view_types(&schema_no_view));
+ }
+
+ #[test]
+ fn test_schema_contains_view_types_nested() {
+ let nested = Field::new(
+ "s",
+ DataType::Struct(Fields::from(vec![Field::new(
+ "v",
+ DataType::Utf8View,
+ true,
+ )])),
+ true,
+ );
+
+ let schema_ref = schema(vec![nested]);
+ assert!(schema_contains_view_types(&schema_ref));
+
+ // Nested struct without any view types
+ let nested_no_view = Field::new(
+ "s",
+ DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8,
true)])),
+ true,
+ );
+ let schema_no_view = schema(vec![nested_no_view]);
+ assert!(!schema_contains_view_types(&schema_no_view));
+ }
+}
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
index 57671547..550e308a 100644
---
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
+++
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
@@ -17,21 +17,25 @@
use std::{
pin::Pin,
+ sync::Arc,
task::{Context, Poll},
vec::IntoIter,
};
+use arrow_schema::SchemaRef;
use datafusion_common::Result;
use crate::evaluated_batch::{evaluated_batch_stream::EvaluatedBatchStream,
EvaluatedBatch};
pub(crate) struct InMemoryEvaluatedBatchStream {
+ schema: SchemaRef,
iter: IntoIter<EvaluatedBatch>,
}
impl InMemoryEvaluatedBatchStream {
- pub fn new(batches: Vec<EvaluatedBatch>) -> Self {
+ pub fn new(schema: SchemaRef, batches: Vec<EvaluatedBatch>) -> Self {
InMemoryEvaluatedBatchStream {
+ schema,
iter: batches.into_iter(),
}
}
@@ -41,6 +45,10 @@ impl EvaluatedBatchStream for InMemoryEvaluatedBatchStream {
fn is_external(&self) -> bool {
false
}
+
+ fn schema(&self) -> arrow_schema::SchemaRef {
+ Arc::clone(&self.schema)
+ }
}
impl futures::Stream for InMemoryEvaluatedBatchStream {
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index ab3e8e27..a1643834 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -29,7 +29,8 @@ use sedona_schema::datatypes::WKB_GEOMETRY;
use crate::{
evaluated_batch::{
evaluated_batch_stream::{
- in_mem::InMemoryEvaluatedBatchStream, SendableEvaluatedBatchStream,
+ evaluate::create_evaluated_build_stream,
in_mem::InMemoryEvaluatedBatchStream,
+ SendableEvaluatedBatchStream,
},
EvaluatedBatch,
},
@@ -88,30 +89,23 @@ impl BuildSideBatchesCollector {
pub async fn collect(
&self,
- mut stream: SendableRecordBatchStream,
+ mut stream: SendableEvaluatedBatchStream,
mut reservation: MemoryReservation,
metrics: &CollectBuildSideMetrics,
) -> Result<BuildPartition> {
- let evaluator = self.evaluator.as_ref();
let mut in_mem_batches: Vec<EvaluatedBatch> = Vec::new();
let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY, WKB_GEOMETRY);
- while let Some(record_batch) = stream.next().await {
- let record_batch = record_batch?;
+ while let Some(evaluated_batch) = stream.next().await {
+ let build_side_batch = evaluated_batch?;
let _timer = metrics.time_taken.timer();
// Process the record batch and create a BuildSideBatch
- let geom_array = evaluator.evaluate_build(&record_batch)?;
-
+ let geom_array = &build_side_batch.geom_array;
for wkb in geom_array.wkbs().iter().flatten() {
analyzer.update_statistics(wkb, wkb.buf().len())?;
}
- let build_side_batch = EvaluatedBatch {
- batch: record_batch,
- geom_array,
- };
-
let in_mem_size = build_side_batch.in_mem_size()?;
metrics.num_batches.add(1);
metrics.num_rows.add(build_side_batch.num_rows());
@@ -122,7 +116,10 @@ impl BuildSideBatchesCollector {
}
Ok(BuildPartition {
- build_side_batch_stream:
Box::pin(InMemoryEvaluatedBatchStream::new(in_mem_batches)),
+ build_side_batch_stream:
Box::pin(InMemoryEvaluatedBatchStream::new(
+ stream.schema(),
+ in_mem_batches,
+ )),
geo_statistics: analyzer.finish(),
reservation,
})
@@ -133,12 +130,28 @@ impl BuildSideBatchesCollector {
streams: Vec<SendableRecordBatchStream>,
reservations: Vec<MemoryReservation>,
metrics_vec: Vec<CollectBuildSideMetrics>,
+ concurrent: bool,
) -> Result<Vec<BuildPartition>> {
if streams.is_empty() {
return Ok(vec![]);
}
- // Spawn all tasks to scan all build streams concurrently
+ if concurrent {
+ self.collect_all_concurrently(streams, reservations, metrics_vec)
+ .await
+ } else {
+ self.collect_all_sequential(streams, reservations, metrics_vec)
+ .await
+ }
+ }
+
+ async fn collect_all_concurrently(
+ &self,
+ streams: Vec<SendableRecordBatchStream>,
+ reservations: Vec<MemoryReservation>,
+ metrics_vec: Vec<CollectBuildSideMetrics>,
+ ) -> Result<Vec<BuildPartition>> {
+ // Spawn a task for each stream to scan all streams concurrently
let mut join_set = JoinSet::new();
for (partition_id, ((stream, metrics), reservation)) in streams
.into_iter()
@@ -147,8 +160,13 @@ impl BuildSideBatchesCollector {
.enumerate()
{
let collector = self.clone();
+ let evaluator = Arc::clone(&self.evaluator);
join_set.spawn(async move {
- let result = collector.collect(stream, reservation,
&metrics).await;
+ let evaluated_stream =
+ create_evaluated_build_stream(stream, evaluator,
metrics.time_taken.clone());
+ let result = collector
+ .collect(evaluated_stream, reservation, &metrics)
+ .await;
(partition_id, result)
});
}
@@ -168,4 +186,26 @@ impl BuildSideBatchesCollector {
Ok(partitions.into_iter().map(|v| v.unwrap()).collect())
}
+
+ async fn collect_all_sequential(
+ &self,
+ streams: Vec<SendableRecordBatchStream>,
+ reservations: Vec<MemoryReservation>,
+ metrics_vec: Vec<CollectBuildSideMetrics>,
+ ) -> Result<Vec<BuildPartition>> {
+ // Collect partitions sequentially (for JNI/embedded contexts)
+ let mut results = Vec::with_capacity(streams.len());
+ for ((stream, metrics), reservation) in
+ streams.into_iter().zip(metrics_vec).zip(reservations)
+ {
+ let evaluator = Arc::clone(&self.evaluator);
+ let evaluated_stream =
+ create_evaluated_build_stream(stream, evaluator,
metrics.time_taken.clone());
+ let result = self
+ .collect(evaluated_stream, reservation, &metrics)
+ .await?;
+ results.push(result);
+ }
+ Ok(results)
+ }
}
diff --git a/rust/sedona-spatial-join/src/stream.rs
b/rust/sedona-spatial-join/src/stream.rs
index 37a84523..f4b18244 100644
--- a/rust/sedona-spatial-join/src/stream.rs
+++ b/rust/sedona-spatial-join/src/stream.rs
@@ -33,9 +33,11 @@ use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
+use
crate::evaluated_batch::evaluated_batch_stream::evaluate::create_evaluated_probe_stream;
+use
crate::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
use crate::evaluated_batch::EvaluatedBatch;
use crate::index::SpatialIndex;
-use crate::operand_evaluator::{create_operand_evaluator, distance_value_at,
OperandEvaluator};
+use crate::operand_evaluator::{create_operand_evaluator, distance_value_at};
use crate::spatial_predicate::SpatialPredicate;
use crate::utils::join_utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices,
@@ -55,7 +57,7 @@ pub(crate) struct SpatialJoinStream {
/// type of the join
join_type: JoinType,
/// The stream of the probe side
- probe_stream: SendableRecordBatchStream,
+ probe_stream: SendableEvaluatedBatchStream,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
/// Maintains the order of the probe side
@@ -76,8 +78,6 @@ pub(crate) struct SpatialJoinStream {
once_async_spatial_index: Arc<Mutex<Option<OnceAsync<SpatialIndex>>>>,
/// The spatial index
spatial_index: Option<Arc<SpatialIndex>>,
- /// The `on` spatial predicate evaluator
- evaluator: Arc<dyn OperandEvaluator>,
/// The spatial predicate being evaluated
spatial_predicate: SpatialPredicate,
}
@@ -99,6 +99,11 @@ impl SpatialJoinStream {
once_async_spatial_index: Arc<Mutex<Option<OnceAsync<SpatialIndex>>>>,
) -> Self {
let evaluator = create_operand_evaluator(on, options.clone());
+ let probe_stream = create_evaluated_probe_stream(
+ probe_stream,
+ Arc::clone(&evaluator),
+ join_metrics.join_time.clone(),
+ );
Self {
schema,
filter,
@@ -113,7 +118,6 @@ impl SpatialJoinStream {
once_fut_spatial_index,
once_async_spatial_index,
spatial_index: None,
- evaluator,
spatial_predicate: on.clone(),
}
}
@@ -386,9 +390,9 @@ impl SpatialJoinStream {
fn create_spatial_join_iterator(
&self,
- probe_batch: RecordBatch,
+ probe_evaluated_batch: EvaluatedBatch,
) -> Result<SpatialJoinBatchIterator> {
- let num_rows = probe_batch.num_rows();
+ let num_rows = probe_evaluated_batch.num_rows();
self.join_metrics.probe_input_batches.add(1);
self.join_metrics.probe_input_rows.add(num_rows);
@@ -398,13 +402,11 @@ impl SpatialJoinStream {
.as_ref()
.expect("Spatial index should be available");
- // Evaluate the probe side geometry expression to get geometry array
- let geom_array = self.evaluator.evaluate_probe(&probe_batch)?;
-
// Update the probe side statistics, which may help the spatial index
to select a better
// execution mode for evaluating the spatial predicate.
if spatial_index.need_more_probe_stats() {
let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY,
WKB_GEOMETRY);
+ let geom_array = &probe_evaluated_batch.geom_array;
for wkb in geom_array.wkbs().iter().flatten() {
analyzer.update_statistics(wkb, wkb.buf().len())?;
}
@@ -414,10 +416,7 @@ impl SpatialJoinStream {
SpatialJoinBatchIterator::new(SpatialJoinBatchIteratorParams {
spatial_index: spatial_index.clone(),
- probe_evaluated_batch: EvaluatedBatch {
- batch: probe_batch,
- geom_array,
- },
+ probe_evaluated_batch,
join_metrics: self.join_metrics.clone(),
max_batch_size: self.target_output_batch_size,
probe_side_ordered: self.probe_side_ordered,
diff --git a/rust/sedona-spatial-join/src/utils/arrow_utils.rs
b/rust/sedona-spatial-join/src/utils/arrow_utils.rs
index c8c5779b..367568fc 100644
--- a/rust/sedona-spatial-join/src/utils/arrow_utils.rs
+++ b/rust/sedona-spatial-join/src/utils/arrow_utils.rs
@@ -15,10 +15,149 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::array::{Array, ArrayData, RecordBatch};
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayData, BinaryViewArray, ListArray, RecordBatch,
StringViewArray};
+use arrow_array::make_array;
use arrow_array::ArrayRef;
+use arrow_array::StructArray;
use arrow_schema::{ArrowError, DataType};
use datafusion_common::Result;
+use sedona_common::sedona_internal_err;
+
+/// Reconstruct `batch` to organize the payload buffers of each
`StringViewArray` and
+/// `BinaryViewArray` in sequential order by calling `gc()` on them.
+///
+/// Note this is a workaround until
<https://github.com/apache/arrow-rs/issues/7185> is
+/// available.
+///
+/// # Rationale
+///
+/// The `interleave` kernel does not reconstruct the inner buffers of view
arrays by default,
+/// leading to non-sequential payload locations. A single payload buffer might
be shared by
+/// multiple `RecordBatch`es or multiple rows in the same batch might
reference scattered
+/// locations in a large buffer.
+///
+/// When writing each batch to disk, the writer has to write all referenced
buffers. This
+/// causes extra disk reads and writes, and potentially execution failure
(e.g. No space left
+/// on device).
+///
+/// # Example
+///
+/// Before interleaving:
+/// batch1 -> buffer1 (large)
+/// batch2 -> buffer2 (large)
+///
+/// interleaved_batch -> buffer1 (sparse access)
+/// -> buffer2 (sparse access)
+///
+/// Then when spilling the interleaved batch, the writer has to write both
buffer1 and buffer2
+/// entirely, even if only a few bytes are used.
+pub(crate) fn compact_batch(batch: RecordBatch) -> Result<RecordBatch> {
+ let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());
+ let mut arr_mutated = false;
+
+ for array in batch.columns() {
+ let (new_array, mutated) = compact_array(Arc::clone(array))?;
+ new_columns.push(new_array);
+ arr_mutated |= mutated;
+ }
+
+ if arr_mutated {
+ Ok(RecordBatch::try_new(batch.schema(), new_columns)?)
+ } else {
+ Ok(batch)
+ }
+}
+
+/// Recursively compacts view arrays in `array` by calling `gc()` on them.
+/// Returns a tuple of the potentially new array and a boolean indicating
+/// whether any compaction was performed.
+pub(crate) fn compact_array(array: ArrayRef) -> Result<(ArrayRef, bool)> {
+ if let Some(view_array) = array.as_any().downcast_ref::<StringViewArray>()
{
+ return Ok((Arc::new(view_array.gc()), true));
+ }
+ if let Some(view_array) = array.as_any().downcast_ref::<BinaryViewArray>()
{
+ return Ok((Arc::new(view_array.gc()), true));
+ }
+
+ // Fast path for non-nested arrays
+ if !array.data_type().is_nested() {
+ return Ok((array, false));
+ }
+
+ // Avoid ArrayData -> ArrayRef roundtrips for commonly used data types,
+ // including StructArray and ListArray.
+
+ if let Some(struct_array) = array.as_any().downcast_ref::<StructArray>() {
+ let mut mutated = false;
+ let mut new_columns: Vec<ArrayRef> =
Vec::with_capacity(struct_array.num_columns());
+ for col in struct_array.columns() {
+ let (new_col, col_mutated) = compact_array(Arc::clone(col))?;
+ mutated |= col_mutated;
+ new_columns.push(new_col);
+ }
+
+ if !mutated {
+ return Ok((array, false));
+ }
+
+ let rebuilt = StructArray::new(
+ struct_array.fields().clone(),
+ new_columns,
+ struct_array.nulls().cloned(),
+ );
+ return Ok((Arc::new(rebuilt), true));
+ }
+
+ if let Some(list_array) = array.as_any().downcast_ref::<ListArray>() {
+ let (new_values, mutated) =
compact_array(list_array.values().clone())?;
+ if !mutated {
+ return Ok((array, false));
+ }
+
+ let DataType::List(field) = list_array.data_type() else {
+ // Defensive: this downcast should only succeed for DataType::List.
+ return sedona_internal_err!(
+ "ListArray has non-List data type: {:?}",
+ list_array.data_type()
+ );
+ };
+
+ let rebuilt = ListArray::new(
+ Arc::clone(field),
+ list_array.offsets().clone(),
+ new_values,
+ list_array.nulls().cloned(),
+ );
+ return Ok((Arc::new(rebuilt), true));
+ }
+
+ // For nested arrays (Map/Dictionary/etc.), recurse into children via
ArrayData.
+ let data = array.to_data();
+ if data.child_data().is_empty() {
+ return Ok((array, false));
+ }
+
+ let mut mutated = false;
+ let mut new_child_data = Vec::with_capacity(data.child_data().len());
+ for child in data.child_data().iter() {
+ let child_array = make_array(child.clone());
+ let (new_child_array, child_mutated) = compact_array(child_array)?;
+ mutated |= child_mutated;
+ new_child_data.push(new_child_array.to_data());
+ }
+
+ if !mutated {
+ return Ok((array, false));
+ }
+
+ // Rebuild this array with identical buffers/nulls but replaced child_data.
+ let mut builder = data.into_builder();
+ builder = builder.child_data(new_child_data);
+ let new_data = builder.build()?;
+ Ok((make_array(new_data), true))
+}
/// Estimate the in-memory size of a given RecordBatch. This function
estimates the
/// size as if the underlying buffers were copied to somewhere else and not
shared.
@@ -101,10 +240,13 @@ fn get_binary_view_value_size(array_data: &ArrayData) ->
Result<usize, ArrowErro
#[cfg(test)]
mod tests {
use super::*;
+ use arrow::array::StringViewBuilder;
use arrow_array::builder::{BinaryViewBuilder, ListBuilder};
use arrow_array::types::Int32Type;
- use arrow_array::{BinaryViewArray, ListArray, StringViewArray,
StructArray};
- use arrow_schema::{DataType, Field};
+ use arrow_array::{
+ BinaryViewArray, BooleanArray, ListArray, StringArray,
StringViewArray, StructArray,
+ };
+ use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
#[test]
@@ -288,4 +430,174 @@ mod tests {
expected_bv_slice1 + expected_i32_slice1
);
}
+
+ #[test]
+ fn test_compact_batch_recurses_into_struct() {
+ let n = 256;
+ let long = "x".repeat(2048);
+
+ let mut builder = StringViewBuilder::with_capacity(n);
+ for i in 0..n {
+ builder.append_value(format!("batch0_{i}_{long}"));
+ }
+ let string_view_array: ArrayRef = Arc::new(builder.finish());
+ let boolean_array: ArrayRef = Arc::new(BooleanArray::from(vec![true;
n]));
+ let struct_fields = vec![
+ Arc::new(Field::new("a", DataType::Utf8View, false)),
+ Arc::new(Field::new("b", DataType::Boolean, false)),
+ ];
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::clone(&struct_fields[0]),
+ Arc::clone(&string_view_array),
+ ),
+ (Arc::clone(&struct_fields[1]), Arc::clone(&boolean_array)),
+ ]);
+
+ let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+ "s",
+ DataType::Struct(struct_fields.into()),
+ false,
+ )]));
+ let batch0 = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(struct_array) as ArrayRef],
+ )
+ .unwrap();
+ let sliced = batch0.slice(0, 1);
+
+ let before = sliced.get_array_memory_size();
+ let compacted = compact_batch(sliced.clone()).unwrap();
+ let after = compacted.get_array_memory_size();
+
+ assert_eq!(sliced.schema(), compacted.schema());
+ assert_eq!(sliced.num_rows(), compacted.num_rows());
+ assert!(
+ after < before,
+ "expected compaction to reduce memory: before={before},
after={after}"
+ );
+ }
+
+ #[test]
+ fn test_compact_batch_without_view_returns_input_as_is() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("flag", DataType::Boolean, true),
+ ]));
+
+ let ids: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![1, 2,
3]));
+ let names: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), None,
Some("c")]));
+ let flags: ArrayRef = Arc::new(BooleanArray::from(vec![Some(true),
None, Some(false)]));
+
+ let batch = RecordBatch::try_new(Arc::clone(&schema), vec![ids, names,
flags]).unwrap();
+ let original = batch.clone();
+
+ let compacted = compact_batch(batch).unwrap();
+
+ // A no-op compaction should preserve the exact schema/column Arcs.
+ assert!(Arc::ptr_eq(&original.schema(), &compacted.schema()));
+ for i in 0..original.num_columns() {
+ assert!(Arc::ptr_eq(original.column(i), compacted.column(i)));
+ }
+ }
+
+ #[test]
+ fn test_compact_array_compacts_struct_containnig_binary_view() {
+ let i32_values = Arc::new(arrow_array::Int32Array::from(vec![1, 2,
3]));
+ let mut bv_builder = BinaryViewBuilder::new();
+ bv_builder.append_value(b"short");
+ bv_builder.append_value(b"Long string that is definitely longer than
12 bytes");
+ bv_builder.append_value(b"Another long string to make buffer larger");
+ let bv: BinaryViewArray = bv_builder.finish();
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("a", DataType::Int32, false)),
+ i32_values as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("s", DataType::BinaryView, false)),
+ Arc::new(bv),
+ ),
+ ]);
+
+ let array: ArrayRef = Arc::new(struct_array);
+ let slice = array.slice(0, 2);
+ let before_size = get_array_memory_size(&array).unwrap();
+
+ let (compacted, mutated) = compact_array(Arc::new(slice)).unwrap();
+ assert!(mutated);
+
+ let after_size = get_array_memory_size(&compacted).unwrap();
+ assert!(after_size < before_size);
+ }
+
+ #[test]
+ fn test_compact_array_compacts_list_of_binary_view() {
+ // Build a List<BinaryView> with many long values. Then slice the list
so it contains
+ // only one row; `compact_array` should compact the nested BinaryView
values.
+ let n = 256;
+ let long = b"Long string that is definitely longer than 12 bytes";
+
+ let mut bv_list_builder = ListBuilder::new(BinaryViewBuilder::new());
+ for i in 0..n {
+ bv_list_builder
+ .values()
+ .append_value([long, i.to_string().as_bytes()].concat());
+ bv_list_builder.append(true);
+ }
+ let bv_list: ListArray = bv_list_builder.finish();
+ let sliced: ArrayRef = Arc::new(bv_list.slice(0, 1));
+ let before_size = get_array_memory_size(&sliced).unwrap();
+
+ let (compacted, mutated) = compact_array(Arc::clone(&sliced)).unwrap();
+ assert!(mutated);
+
+ let after_size = get_array_memory_size(&compacted).unwrap();
+ assert!(after_size <= before_size);
+ }
+
+ #[test]
+ fn test_compact_array_list_without_view_is_noop() {
+ let i32_list: ListArray = ListArray::from_iter_primitive::<Int32Type,
_, _>([
+ Some(vec![Some(1), Some(2), Some(3)]),
+ Some(vec![Some(4)]),
+ ]);
+
+ let array: ArrayRef = Arc::new(i32_list);
+ let (compacted, mutated) = compact_array(Arc::clone(&array)).unwrap();
+ assert!(!mutated);
+ assert!(Arc::ptr_eq(&array, &compacted));
+ }
+
+ #[test]
+ fn test_compact_array_struct_without_view_is_noop() {
+ let i32_values = Arc::new(arrow_array::Int32Array::from(vec![1, 2,
3]));
+ let bool_values = Arc::new(BooleanArray::from(vec![true, false,
true]));
+ let i32_list: ArrayRef =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>([
+ Some(vec![Some(1), Some(2)]),
+ Some(vec![Some(3)]),
+ None,
+ ]));
+
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("a", DataType::Int32, false)),
+ i32_values as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("b", DataType::Boolean, false)),
+ bool_values as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("c", i32_list.data_type().clone(), true)),
+ i32_list,
+ ),
+ ]);
+
+ let array: ArrayRef = Arc::new(struct_array);
+ let (compacted, mutated) = compact_array(Arc::clone(&array)).unwrap();
+ assert!(!mutated);
+ assert!(Arc::ptr_eq(&array, &compacted));
+ }
}