This is an automated email from the ASF dual-hosted git repository.
kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 9b6e2e73 feat(rust/sedona-spatial-join) Spill EvaluatedBatch and add
external evaluated batch stream (#522)
9b6e2e73 is described below
commit 9b6e2e735da9f2c26c25acc29a0d3aee011e2786
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Mon Jan 19 11:53:02 2026 +0800
feat(rust/sedona-spatial-join) Spill EvaluatedBatch and add external
evaluated batch stream (#522)
Add spill reader and writer for evaluated batch stream. `EvaluatedBatch` is
wrapped as a regular `RecordBatch` with the following fields:
* `data`: The original batch in `EvaluatedBatch::batch`
* `geom`: The value of `EvaluatedBatch::geom_array::geometry_array`
* `dist`: The value of `EvaluatedBatch::geom_array::distance`
`ExternalEvaluatedBatchStream` implements `EvaluatedBatchStream` and
produces `EvaluateBatch` from spill files.
Co-authored-by: Dewey Dunnington <[email protected]>
---
Cargo.lock | 1 +
Cargo.toml | 1 +
rust/sedona-spatial-join/Cargo.toml | 11 +
.../external_evaluated_batch_stream.rs | 165 +++++
.../bench/evaluated_batch/spill.rs | 210 ++++++
rust/sedona-spatial-join/src/evaluated_batch.rs | 5 +-
.../src/evaluated_batch/evaluated_batch_stream.rs | 1 +
.../evaluated_batch_stream/evaluate.rs | 66 +-
.../evaluated_batch_stream/external.rs | 638 ++++++++++++++++
.../src/evaluated_batch/spill.rs | 806 +++++++++++++++++++++
.../src/index/build_side_collector.rs | 4 +-
rust/sedona-spatial-join/src/lib.rs | 2 +-
rust/sedona-spatial-join/src/operand_evaluator.rs | 2 +-
rust/sedona-spatial-join/src/utils.rs | 1 +
rust/sedona-spatial-join/src/utils/arrow_utils.rs | 67 +-
rust/sedona-spatial-join/src/utils/spill.rs | 382 ++++++++++
16 files changed, 2286 insertions(+), 76 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 46f9ff1b..f99ff5ce 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5404,6 +5404,7 @@ dependencies = [
"geos",
"once_cell",
"parking_lot",
+ "pin-project-lite",
"rand",
"rstest",
"sedona-common",
diff --git a/Cargo.toml b/Cargo.toml
index 99e084e9..6c20d23d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -92,6 +92,7 @@ dirs = "6.0.0"
env_logger = "0.11"
fastrand = "2.0"
futures = "0.3"
+pin-project-lite = "0.2"
glam = "0.30.10"
object_store = { version = "0.12.4", default-features = false }
float_next_after = "1"
diff --git a/rust/sedona-spatial-join/Cargo.toml
b/rust/sedona-spatial-join/Cargo.toml
index b4c8f630..9831c59b 100644
--- a/rust/sedona-spatial-join/Cargo.toml
+++ b/rust/sedona-spatial-join/Cargo.toml
@@ -45,6 +45,7 @@ datafusion-physical-plan = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-common-runtime = { workspace = true }
futures = { workspace = true }
+pin-project-lite = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
geo = { workspace = true }
@@ -89,3 +90,13 @@ harness = false
name = "flat"
path = "bench/partitioning/flat.rs"
harness = false
+
+[[bench]]
+name = "evaluated_batch_spill"
+path = "bench/evaluated_batch/spill.rs"
+harness = false
+
+[[bench]]
+name = "external_evaluated_batch_stream"
+path = "bench/evaluated_batch/external_evaluated_batch_stream.rs"
+harness = false
diff --git
a/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
new file mode 100644
index 00000000..950502be
--- /dev/null
+++
b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hint::black_box;
+use std::sync::Arc;
+
+use arrow_array::{Int32Array, RecordBatch, StringArray};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion,
Throughput};
+use datafusion::config::SpillCompression;
+use datafusion_common::ScalarValue;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
+use futures::StreamExt;
+use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY, WKB_VIEW_GEOMETRY};
+use
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::external::ExternalEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::spill::EvaluatedBatchSpillWriter;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_testing::create::create_array_storage;
+
+const ROWS: usize = 8192;
+const BATCHES_PER_FILE: usize = 64;
+
+fn make_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]))
+}
+
+fn make_evaluated_batch(num_rows: usize, sedona_type: &SedonaType) ->
EvaluatedBatch {
+ let schema = make_schema();
+ let ids: Vec<i32> = (0..num_rows).map(|v| v as i32).collect();
+ let id_array = Arc::new(Int32Array::from(ids));
+ let name_array = Arc::new(StringArray::from(vec![Some("Alice");
num_rows]));
+ let batch = RecordBatch::try_new(schema, vec![id_array, name_array])
+ .expect("failed to build record batch for benchmark");
+
+ // Use sedona-testing helpers so this benchmark stays focused on spill +
stream I/O.
+ // This builds either a Binary (WKB_GEOMETRY) or BinaryView
(WKB_VIEW_GEOMETRY) array.
+ let wkt_values = vec![Some("POINT (0 0)"); num_rows];
+ let geom_array = create_array_storage(&wkt_values, sedona_type);
+
+ let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
sedona_type)
+ .expect("failed to build geometry array for benchmark");
+
+ geom_array.distance =
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+
+ EvaluatedBatch { batch, geom_array }
+}
+
+fn write_spill_file(
+ env: Arc<RuntimeEnv>,
+ schema: SchemaRef,
+ metrics_set: &ExecutionPlanMetricsSet,
+ sedona_type: &SedonaType,
+ compression: SpillCompression,
+ evaluated_batch: &EvaluatedBatch,
+) -> Arc<datafusion_execution::disk_manager::RefCountedTempFile> {
+ let metrics = SpillMetrics::new(metrics_set, 0);
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ sedona_type,
+ "bench_external_stream",
+ compression,
+ metrics,
+ None,
+ )
+ .expect("failed to create spill writer for benchmark");
+
+ for _ in 0..BATCHES_PER_FILE {
+ writer
+ .append(evaluated_batch)
+ .expect("failed to append batch in benchmark");
+ }
+
+ Arc::new(writer.finish().expect("failed to finish spill writer"))
+}
+
+fn bench_external_evaluated_batch_stream(c: &mut Criterion) {
+ let env = Arc::new(RuntimeEnv::default());
+ let schema = make_schema();
+ let metrics_set = ExecutionPlanMetricsSet::new();
+
+ let compressions = [
+ ("uncompressed", SpillCompression::Uncompressed),
+ ("lz4", SpillCompression::Lz4Frame),
+ ];
+
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .build()
+ .expect("failed to create tokio runtime");
+
+ for (label, sedona_type) in [("wkb", WKB_GEOMETRY), ("wkb_view",
WKB_VIEW_GEOMETRY)] {
+ let evaluated_batch = make_evaluated_batch(ROWS, &sedona_type);
+
+ for (compression_label, compression) in compressions {
+ let spill_file = write_spill_file(
+ Arc::clone(&env),
+ Arc::clone(&schema),
+ &metrics_set,
+ &sedona_type,
+ compression,
+ &evaluated_batch,
+ );
+
+ let mut group = c.benchmark_group(format!(
+ "external_evaluated_batch_stream/{label}/{compression_label}"
+ ));
+ group.throughput(Throughput::Elements((ROWS * BATCHES_PER_FILE) as
u64));
+
+ group.bench_with_input(
+ BenchmarkId::new(
+ "external_stream",
+ format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"),
+ ),
+ &spill_file,
+ |b, file| {
+ b.iter(|| {
+ runtime.block_on(async {
+ let stream =
+
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::clone(file))
+ .expect("failed to create external
evaluated batch stream");
+ futures::pin_mut!(stream);
+
+ let mut rows = 0usize;
+ while let Some(batch) = stream.next().await {
+ let batch =
+ batch.expect("failed to read evaluated
batch from stream");
+ rows += batch.num_rows();
+ black_box(batch);
+ }
+ black_box(rows);
+ })
+ })
+ },
+ );
+
+ group.finish();
+ }
+ }
+}
+
+criterion_group!(
+ external_evaluated_batch_stream,
+ bench_external_evaluated_batch_stream
+);
+criterion_main!(external_evaluated_batch_stream);
diff --git a/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
new file mode 100644
index 00000000..b87a6b9c
--- /dev/null
+++ b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hint::black_box;
+use std::sync::Arc;
+
+use arrow_array::{Int32Array, RecordBatch, StringArray};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion,
Throughput};
+use datafusion::config::SpillCompression;
+use datafusion_common::ScalarValue;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
+use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY, WKB_VIEW_GEOMETRY};
+use sedona_spatial_join::evaluated_batch::spill::{
+ EvaluatedBatchSpillReader, EvaluatedBatchSpillWriter,
+};
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_testing::create::create_array_storage;
+
+const ROWS: usize = 8192;
+const BATCHES_PER_FILE: usize = 64;
+
+fn make_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]))
+}
+
+fn make_evaluated_batch(num_rows: usize, sedona_type: &SedonaType) ->
EvaluatedBatch {
+ let schema = make_schema();
+ let ids: Vec<i32> = (0..num_rows).map(|v| v as i32).collect();
+ let id_array = Arc::new(Int32Array::from(ids));
+ let name_array = Arc::new(StringArray::from(vec![Some("Alice");
num_rows]));
+ let batch = RecordBatch::try_new(schema, vec![id_array, name_array])
+ .expect("failed to build record batch for benchmark");
+
+ // Use sedona-testing helpers so this benchmark stays focused on spill I/O.
+ // This builds either a Binary (WKB_GEOMETRY) or BinaryView
(WKB_VIEW_GEOMETRY) array.
+ let wkt_values = vec![Some("POINT (0 0)"); num_rows];
+ let geom_array = create_array_storage(&wkt_values, sedona_type);
+
+ let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
sedona_type)
+ .expect("failed to build geometry array for benchmark");
+
+ // Use a scalar distance so the spilled dist column is constant.
+ geom_array.distance =
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+
+ EvaluatedBatch { batch, geom_array }
+}
+
+fn write_spill_file(
+ env: Arc<RuntimeEnv>,
+ schema: SchemaRef,
+ metrics_set: &ExecutionPlanMetricsSet,
+ sedona_type: &SedonaType,
+ compression: SpillCompression,
+ evaluated_batch: &EvaluatedBatch,
+) -> Arc<datafusion_execution::disk_manager::RefCountedTempFile> {
+ let metrics = SpillMetrics::new(metrics_set, 0);
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ sedona_type,
+ "bench_spill",
+ compression,
+ metrics,
+ None,
+ )
+ .expect("failed to create spill writer for benchmark");
+
+ for _ in 0..BATCHES_PER_FILE {
+ writer
+ .append(evaluated_batch)
+ .expect("failed to append batch in benchmark");
+ }
+
+ Arc::new(writer.finish().expect("failed to finish spill writer"))
+}
+
+fn bench_spill_writer_and_reader(c: &mut Criterion) {
+ let env = Arc::new(RuntimeEnv::default());
+ let schema = make_schema();
+ let metrics_set = ExecutionPlanMetricsSet::new();
+
+ let compressions = [
+ ("uncompressed", SpillCompression::Uncompressed),
+ ("lz4", SpillCompression::Lz4Frame),
+ ];
+
+ for (label, sedona_type) in [("wkb", WKB_GEOMETRY), ("wkb_view",
WKB_VIEW_GEOMETRY)] {
+ let evaluated_batch = make_evaluated_batch(ROWS, &sedona_type);
+
+ for (compression_label, compression) in compressions {
+ // Prepare a stable spill file for read benchmarks.
+ let spill_file = write_spill_file(
+ Arc::clone(&env),
+ Arc::clone(&schema),
+ &metrics_set,
+ &sedona_type,
+ compression,
+ &evaluated_batch,
+ );
+
+ let mut group =
+
c.benchmark_group(format!("evaluated_batch_spill/{label}/{compression_label}"));
+ group.throughput(Throughput::Elements((ROWS * BATCHES_PER_FILE) as
u64));
+
+ group.bench_with_input(
+ BenchmarkId::new(
+ "spill_writer",
+ format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"),
+ ),
+ &evaluated_batch,
+ |b, batch| {
+ b.iter(|| {
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ Arc::clone(&env),
+ Arc::clone(&schema),
+ &sedona_type,
+ "bench_spill",
+ compression,
+ metrics,
+ None,
+ )
+ .expect("failed to create spill writer");
+
+ for _ in 0..BATCHES_PER_FILE {
+ writer.append(black_box(batch)).unwrap();
+ }
+
+ let file = writer.finish().unwrap();
+ black_box(file);
+ })
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new(
+ "spill_reader",
+ format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"),
+ ),
+ &spill_file,
+ |b, file| {
+ b.iter(|| {
+ let mut reader =
+
EvaluatedBatchSpillReader::try_new(black_box(file.as_ref()))
+ .expect("failed to create spill reader");
+ let mut rows = 0usize;
+
+ while let Some(batch) = reader.next_batch() {
+ let batch = batch.expect("failed to read evaluated
batch");
+ rows += batch.num_rows();
+ black_box(batch);
+ }
+
+ black_box(rows);
+ })
+ },
+ );
+
+ group.bench_with_input(
+ BenchmarkId::new(
+ "spill_reader_raw",
+ format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"),
+ ),
+ &spill_file,
+ |b, file| {
+ b.iter(|| {
+ let mut reader =
+
EvaluatedBatchSpillReader::try_new(black_box(file.as_ref()))
+ .expect("failed to create spill reader");
+ let mut rows = 0usize;
+
+ while let Some(batch) = reader.next_raw_batch() {
+ let batch = batch.expect("failed to read record
batch");
+ rows += batch.num_rows();
+ black_box(batch);
+ }
+
+ black_box(rows);
+ })
+ },
+ );
+
+ group.finish();
+ }
+ }
+}
+
+criterion_group!(evaluated_batch_spill, bench_spill_writer_and_reader);
+criterion_main!(evaluated_batch_spill);
diff --git a/rust/sedona-spatial-join/src/evaluated_batch.rs
b/rust/sedona-spatial-join/src/evaluated_batch.rs
index d44d49ec..aad3f11b 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch.rs
@@ -27,7 +27,7 @@ use crate::{
/// EvaluatedBatch contains the original record batch from the input stream
and the evaluated
/// geometry array.
-pub(crate) struct EvaluatedBatch {
+pub struct EvaluatedBatch {
/// Original record batch polled from the stream
pub batch: RecordBatch,
/// Evaluated geometry array, containing the geometry array containing
geometries to be joined,
@@ -65,4 +65,5 @@ impl EvaluatedBatch {
}
}
-pub(crate) mod evaluated_batch_stream;
+pub mod evaluated_batch_stream;
+pub mod spill;
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
index eb1f855c..c18761d2 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
@@ -39,4 +39,5 @@ pub(crate) trait EvaluatedBatchStream: Stream<Item =
Result<EvaluatedBatch>> {
pub(crate) type SendableEvaluatedBatchStream = Pin<Box<dyn
EvaluatedBatchStream + Send>>;
pub(crate) mod evaluate;
+pub mod external;
pub(crate) mod in_mem;
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
index 0baf3c5f..f6e185c3 100644
---
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
+++
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use arrow_array::RecordBatch;
-use arrow_schema::{DataType, SchemaRef};
use datafusion_common::Result;
use datafusion_physical_plan::{metrics, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
@@ -30,7 +29,7 @@ use crate::evaluated_batch::{
EvaluatedBatch,
};
use crate::operand_evaluator::{EvaluatedGeometryArray, OperandEvaluator};
-use crate::utils::arrow_utils::compact_batch;
+use crate::utils::arrow_utils::{compact_batch, schema_contains_view_types};
/// An evaluator that can evaluate geometry expressions on record batches
/// and produces evaluated geometry arrays.
@@ -86,14 +85,6 @@ impl<E: Evaluator> EvaluateOperandBatchStream<E> {
}
}
-/// Checks if the schema contains any view types (Utf8View or BinaryView).
-fn schema_contains_view_types(schema: &SchemaRef) -> bool {
- schema
- .flattened_fields()
- .iter()
- .any(|field| matches!(field.data_type(), DataType::Utf8View |
DataType::BinaryView))
-}
-
impl<E: Evaluator> EvaluatedBatchStream for EvaluateOperandBatchStream<E> {
fn is_external(&self) -> bool {
false
@@ -160,58 +151,3 @@ pub(crate) fn create_evaluated_probe_stream(
false,
))
}
-
-#[cfg(test)]
-mod tests {
- use std::sync::Arc;
-
- use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
-
- use super::schema_contains_view_types;
-
- fn schema(fields: Vec<Field>) -> SchemaRef {
- Arc::new(Schema::new(fields))
- }
-
- #[test]
- fn test_schema_contains_view_types_top_level() {
- let schema_ref = schema(vec![
- Field::new("a", DataType::Utf8View, true),
- Field::new("b", DataType::BinaryView, true),
- ]);
-
- assert!(schema_contains_view_types(&schema_ref));
-
- // Similar shape but without view types
- let schema_no_view = schema(vec![
- Field::new("a", DataType::Utf8, true),
- Field::new("b", DataType::Binary, true),
- ]);
- assert!(!schema_contains_view_types(&schema_no_view));
- }
-
- #[test]
- fn test_schema_contains_view_types_nested() {
- let nested = Field::new(
- "s",
- DataType::Struct(Fields::from(vec![Field::new(
- "v",
- DataType::Utf8View,
- true,
- )])),
- true,
- );
-
- let schema_ref = schema(vec![nested]);
- assert!(schema_contains_view_types(&schema_ref));
-
- // Nested struct without any view types
- let nested_no_view = Field::new(
- "s",
- DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8,
true)])),
- true,
- );
- let schema_no_view = schema(vec![nested_no_view]);
- assert!(!schema_contains_view_types(&schema_no_view));
- }
-}
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
new file mode 100644
index 00000000..67d3538e
--- /dev/null
+++
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
@@ -0,0 +1,638 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ collections::VecDeque,
+ iter,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+};
+
+use arrow_array::RecordBatch;
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::{DataFusionError, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::{
+ disk_manager::RefCountedTempFile, RecordBatchStream,
SendableRecordBatchStream,
+};
+use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
+use futures::{FutureExt, StreamExt};
+use pin_project_lite::pin_project;
+use sedona_common::sedona_internal_err;
+
+use crate::evaluated_batch::{
+ evaluated_batch_stream::EvaluatedBatchStream,
+ spill::{
+ spilled_batch_to_evaluated_batch, spilled_schema_to_evaluated_schema,
+ EvaluatedBatchSpillReader,
+ },
+ EvaluatedBatch,
+};
+
+const RECORD_BATCH_CHANNEL_CAPACITY: usize = 2;
+
+pin_project! {
+ /// Streams [`EvaluatedBatch`] values read back from on-disk spill files.
+ ///
+ /// This stream is intended for the “spilled” path where batches have been
written to disk and
+ /// must be read back into memory. It wraps an
[`ExternalRecordBatchStream`] and uses
+ /// background tasks to prefetch/forward batches so downstream operators
can process a batch
+ /// while the next one is being loaded.
+ pub struct ExternalEvaluatedBatchStream {
+ #[pin]
+ inner: RecordBatchToEvaluatedStream,
+ schema: SchemaRef,
+ }
+}
+
+enum State {
+ AwaitingFile,
+ Opening(SpawnedTask<Result<EvaluatedBatchSpillReader>>),
+ Reading(SpawnedTask<(EvaluatedBatchSpillReader,
Option<Result<RecordBatch>>)>),
+ Finished,
+}
+
+impl ExternalEvaluatedBatchStream {
+ /// Creates an external stream from a single spill file.
+ pub fn try_from_spill_file(spill_file: Arc<RefCountedTempFile>) ->
Result<Self> {
+ let record_stream =
+
ExternalRecordBatchStream::try_from_spill_files(iter::once(spill_file))?;
+ let evaluated_stream =
+
RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?;
+ let schema = evaluated_stream.schema();
+ Ok(Self {
+ inner: evaluated_stream,
+ schema,
+ })
+ }
+
+ /// Creates an external stream from multiple spill files.
+ ///
+ /// The stream yields the batches from each file in order. When
`spill_files` is empty the
+ /// stream is empty (returns `None` immediately) and no schema validation
is performed.
+ pub fn try_from_spill_files<I>(schema: SchemaRef, spill_files: I) ->
Result<Self>
+ where
+ I: IntoIterator<Item = Arc<RefCountedTempFile>>,
+ {
+ let record_stream =
ExternalRecordBatchStream::try_from_spill_files(spill_files)?;
+ if !record_stream.is_empty() {
+ // `ExternalRecordBatchStream` only has a meaningful schema when
at least one spill
+ // file is provided. In that case, validate that the
caller-provided evaluated schema
+ // matches what would be derived from the spilled schema.
+ let actual_schema =
spilled_schema_to_evaluated_schema(&record_stream.schema())?;
+ if schema != actual_schema {
+ return sedona_internal_err!(
+ "Schema mismatch when creating
ExternalEvaluatedBatchStream"
+ );
+ }
+ }
+ let evaluated_stream =
+
RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?;
+ Ok(Self {
+ inner: evaluated_stream,
+ schema,
+ })
+ }
+}
+
+impl EvaluatedBatchStream for ExternalEvaluatedBatchStream {
+ fn is_external(&self) -> bool {
+ true
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+}
+
+impl futures::Stream for ExternalEvaluatedBatchStream {
+ type Item = Result<EvaluatedBatch>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ self.project().inner.poll_next(cx)
+ }
+}
+
+pin_project! {
+ /// Adapts a [`RecordBatchStream`] containing spilled batches into an
[`EvaluatedBatch`] stream.
+ ///
+ /// Each incoming `RecordBatch` is decoded via
[`spilled_batch_to_evaluated_batch`]. This type
+ /// also carries the derived evaluated schema for downstream consumers.
+ struct RecordBatchToEvaluatedStream {
+ #[pin]
+ inner: SendableRecordBatchStream,
+ evaluated_schema: SchemaRef,
+ }
+}
+
+impl RecordBatchToEvaluatedStream {
+ fn try_new(inner: SendableRecordBatchStream) -> Result<Self> {
+ let evaluated_schema =
spilled_schema_to_evaluated_schema(&inner.schema())?;
+ Ok(Self {
+ inner,
+ evaluated_schema,
+ })
+ }
+
+ /// Buffers `record_stream` by forwarding it through a bounded channel.
+ ///
+ /// This is primarily useful for [`ExternalRecordBatchStream`], where
producing the next batch
+ /// may involve disk I/O and `spawn_blocking` work. By polling the source
stream in a spawned
+ /// task, we can overlap “load next batch” with “process current batch”,
while still applying
+ /// backpressure via [`RECORD_BATCH_CHANNEL_CAPACITY`].
+ ///
+ /// The forwarding task stops when the receiver is dropped or when the
source stream yields its
+ /// first error.
+ fn try_spawned_evaluated_stream(record_stream: SendableRecordBatchStream)
-> Result<Self> {
+ let schema = record_stream.schema();
+ let mut builder =
+ RecordBatchReceiverStreamBuilder::new(schema,
RECORD_BATCH_CHANNEL_CAPACITY);
+ let tx = builder.tx();
+ builder.spawn(async move {
+ let mut record_stream = record_stream;
+ while let Some(batch) = record_stream.next().await {
+ let is_err = batch.is_err();
+ if tx.send(batch).await.is_err() {
+ break;
+ }
+ if is_err {
+ break;
+ }
+ }
+ Ok(())
+ });
+
+ let buffered = builder.build();
+ Self::try_new(buffered)
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.evaluated_schema)
+ }
+}
+
+impl futures::Stream for RecordBatchToEvaluatedStream {
+ type Item = Result<EvaluatedBatch>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ match this.inner.as_mut().poll_next(cx) {
+ Poll::Ready(Some(Ok(batch))) => {
+ Poll::Ready(Some(spilled_batch_to_evaluated_batch(batch)))
+ }
+ Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
+ Poll::Ready(None) => Poll::Ready(None),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
+/// Streams raw [`RecordBatch`] values directly from spill files.
+///
+/// This is the lowest-level “read from disk” stream: it opens each spill
file, reads the stored
+/// record batches sequentially, and yields them without decoding into
[`EvaluatedBatch`].
+///
+/// Schema handling:
+/// - If at least one spill file is provided, the stream schema is taken from
the first file.
+/// - If no files are provided, the schema is empty and the stream terminates
immediately.
+pub(crate) struct ExternalRecordBatchStream {
+ schema: SchemaRef,
+ state: State,
+ spill_files: VecDeque<Arc<RefCountedTempFile>>,
+ is_empty: bool,
+}
+
+impl ExternalRecordBatchStream {
+ /// Creates a stream over `spill_files`, yielding all batches from each
file in order.
+ ///
+ /// This function assumes all spill files were written with a compatible
schema.
+ pub fn try_from_spill_files<I>(spill_files: I) -> Result<Self>
+ where
+ I: IntoIterator<Item = Arc<RefCountedTempFile>>,
+ {
+ let spill_files = spill_files.into_iter().collect::<VecDeque<_>>();
+ let (schema, is_empty) = match spill_files.front() {
+ Some(file) => {
+ let reader = EvaluatedBatchSpillReader::try_new(file)?;
+ (reader.schema(), false)
+ }
+ None => (Arc::new(Schema::empty()), true),
+ };
+ Ok(Self {
+ schema,
+ state: State::AwaitingFile,
+ spill_files,
+ is_empty,
+ })
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.is_empty
+ }
+}
+
+impl RecordBatchStream for ExternalRecordBatchStream {
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.schema)
+ }
+}
+
+impl futures::Stream for ExternalRecordBatchStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ let self_mut = self.get_mut();
+
+ loop {
+ match &mut self_mut.state {
+ State::AwaitingFile => match self_mut.spill_files.pop_front() {
+ Some(spill_file) => {
+ let task = SpawnedTask::spawn_blocking(move || {
+ EvaluatedBatchSpillReader::try_new(&spill_file)
+ });
+ self_mut.state = State::Opening(task);
+ }
+ None => {
+ self_mut.state = State::Finished;
+ return Poll::Ready(None);
+ }
+ },
+ State::Opening(task) => match
futures::ready!(task.poll_unpin(cx)) {
+ Err(e) => {
+ self_mut.state = State::Finished;
+ return
Poll::Ready(Some(Err(DataFusionError::External(Box::new(e)))));
+ }
+ Ok(Err(e)) => {
+ self_mut.state = State::Finished;
+ return Poll::Ready(Some(Err(e)));
+ }
+ Ok(Ok(mut spill_reader)) => {
+ let task = SpawnedTask::spawn_blocking(move || {
+ let next_batch = spill_reader.next_raw_batch();
+ (spill_reader, next_batch)
+ });
+ self_mut.state = State::Reading(task);
+ }
+ },
+ State::Reading(task) => match
futures::ready!(task.poll_unpin(cx)) {
+ Err(e) => {
+ self_mut.state = State::Finished;
+ return
Poll::Ready(Some(Err(DataFusionError::External(Box::new(e)))));
+ }
+ Ok((_, None)) => {
+ self_mut.state = State::AwaitingFile;
+ continue;
+ }
+ Ok((_, Some(Err(e)))) => {
+ self_mut.state = State::Finished;
+ return Poll::Ready(Some(Err(e)));
+ }
+ Ok((mut spill_reader, Some(Ok(batch)))) => {
+ let task = SpawnedTask::spawn_blocking(move || {
+ let next_batch = spill_reader.next_raw_batch();
+ (spill_reader, next_batch)
+ });
+ self_mut.state = State::Reading(task);
+ return Poll::Ready(Some(Ok(batch)));
+ }
+ },
+ State::Finished => {
+ return Poll::Ready(None);
+ }
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::evaluated_batch::spill::EvaluatedBatchSpillWriter;
+ use crate::operand_evaluator::EvaluatedGeometryArray;
+ use arrow_array::{Array, ArrayRef, BinaryArray, Int32Array, RecordBatch,
StringArray};
+ use arrow_schema::{DataType, Field, Schema, SchemaRef};
+ use datafusion::config::SpillCompression;
+ use datafusion_common::{Result, ScalarValue};
+ use datafusion_execution::runtime_env::RuntimeEnv;
+ use datafusion_expr::ColumnarValue;
+ use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet,
SpillMetrics};
+ use futures::StreamExt;
+ use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY};
+ use std::sync::Arc;
+
+ fn create_test_runtime_env() -> Result<Arc<RuntimeEnv>> {
+ Ok(Arc::new(RuntimeEnv::default()))
+ }
+
+ fn create_test_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]))
+ }
+
+ fn create_test_record_batch(start_id: i32) -> Result<RecordBatch> {
+ let schema = create_test_schema();
+ let id_array = Arc::new(Int32Array::from(vec![start_id, start_id + 1,
start_id + 2]));
+ let name_array = Arc::new(StringArray::from(vec![Some("Alice"),
Some("Bob"), None]));
+ RecordBatch::try_new(schema, vec![id_array, name_array]).map_err(|e|
e.into())
+ }
+
+ fn create_test_geometry_array() -> Result<(ArrayRef, SedonaType)> {
+ // Create WKB encoded points (simple binary data for testing)
+ let point1_wkb: Vec<u8> = vec![
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64,
+ ];
+ let point2_wkb: Vec<u8> = vec![
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 16, 64,
+ ];
+ let point3_wkb: Vec<u8> = vec![
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64,
+ ];
+
+ let sedona_type = WKB_GEOMETRY;
+ let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![
+ Some(point1_wkb.as_slice()),
+ Some(point2_wkb.as_slice()),
+ Some(point3_wkb.as_slice()),
+ ]));
+
+ Ok((geom_array, sedona_type))
+ }
+
+ fn create_test_evaluated_batch(start_id: i32) -> Result<EvaluatedBatch> {
+ let batch = create_test_record_batch(start_id)?;
+ let (geom_array, sedona_type) = create_test_geometry_array()?;
+ let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
+
+ // Add distance as a scalar value
+ geom_array.distance =
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+
+ Ok(EvaluatedBatch { batch, geom_array })
+ }
+
+ async fn create_spill_file_with_batches(num_batches: usize) ->
Result<RefCountedTempFile> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_external_stream",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ for i in 0..num_batches {
+ let batch = create_test_evaluated_batch((i * 3) as i32)?;
+ writer.append(&batch)?;
+ }
+
+ writer.finish()
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_creation() -> Result<()> {
+ let spill_file = create_spill_file_with_batches(1).await?;
+ let stream =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+ assert!(stream.is_external());
+ assert_eq!(stream.schema(), create_test_schema());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_single_batch() -> Result<()> {
+ let spill_file = create_spill_file_with_batches(1).await?;
+ let mut stream =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+ let batch = stream.next().await.unwrap()?;
+ assert_eq!(batch.num_rows(), 3);
+
+ // Polling again should still return None
+ assert!(stream.next().await.is_none());
+ assert!(stream.next().await.is_none());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_large_number_of_batches() -> Result<()> {
+ let num_batches = 100;
+ let spill_file = create_spill_file_with_batches(num_batches).await?;
+ let mut stream =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+ let mut count = 0;
+ while let Some(batch_result) = stream.next().await {
+ let batch = batch_result?;
+ assert_eq!(batch.num_rows(), 3);
+ count += 1;
+ }
+
+ assert_eq!(count, num_batches);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_is_external_flag() -> Result<()> {
+ let spill_file = create_spill_file_with_batches(1).await?;
+ let stream =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+ // Verify the is_external flag returns true
+ assert!(stream.is_external());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_concurrent_access() -> Result<()> {
+ // Create multiple streams reading from different files
+ let file1 = create_spill_file_with_batches(2).await?;
+ let file2 = create_spill_file_with_batches(3).await?;
+
+ let mut stream1 =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(file1))?;
+ let mut stream2 =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(file2))?;
+
+ // Read from both streams
+ let batch1_1 = stream1.next().await.unwrap()?;
+ let batch2_1 = stream2.next().await.unwrap()?;
+
+ assert_eq!(batch1_1.num_rows(), 3);
+ assert_eq!(batch2_1.num_rows(), 3);
+
+ // Continue reading
+ let batch1_2 = stream1.next().await.unwrap()?;
+ let batch2_2 = stream2.next().await.unwrap()?;
+
+ assert_eq!(batch1_2.num_rows(), 3);
+ assert_eq!(batch2_2.num_rows(), 3);
+
+ // Stream1 should be done, stream2 should have one more
+ assert!(stream1.next().await.is_none());
+ let batch2_3 = stream2.next().await.unwrap()?;
+ assert_eq!(batch2_3.num_rows(), 3);
+ assert!(stream2.next().await.is_none());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_multiple_spill_files() -> Result<()> {
+ let file1 = create_spill_file_with_batches(2).await?;
+ let file2 = create_spill_file_with_batches(3).await?;
+ let schema = create_test_schema();
+ let mut stream = ExternalEvaluatedBatchStream::try_from_spill_files(
+ Arc::clone(&schema),
+ vec![Arc::new(file1), Arc::new(file2)],
+ )?;
+
+ assert_eq!(stream.schema(), schema);
+
+ let mut batches_read = 0;
+ while let Some(batch_result) = stream.next().await {
+ let batch = batch_result?;
+ assert_eq!(batch.num_rows(), 3);
+ batches_read += 1;
+ }
+
+ assert_eq!(batches_read, 5);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_empty_file() -> Result<()> {
+ let spill_file = create_spill_file_with_batches(0).await?;
+ let mut stream =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+ // Should immediately return None
+ let batch = stream.next().await;
+ assert!(batch.is_none());
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_empty_spill_file_list() -> Result<()> {
+ let schema = create_test_schema();
+ let stream = ExternalEvaluatedBatchStream::try_from_spill_files(
+ Arc::clone(&schema),
+ Vec::<Arc<RefCountedTempFile>>::new(),
+ )?;
+
+ assert_eq!(stream.schema(), schema);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_preserves_data() -> Result<()> {
+ let spill_file = create_spill_file_with_batches(1).await?;
+ let mut stream =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+ let batch = stream.next().await.unwrap()?;
+
+ // Verify data preservation
+ let id_array = batch
+ .batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(id_array.value(0), 0);
+ assert_eq!(id_array.value(1), 1);
+ assert_eq!(id_array.value(2), 2);
+
+ let name_array = batch
+ .batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(name_array.value(0), "Alice");
+ assert_eq!(name_array.value(1), "Bob");
+ assert!(name_array.is_null(2));
+
+ // Verify geometry array
+ assert_eq!(batch.geom_array.rects.len(), 3);
+
+ // Verify distance
+ match &batch.geom_array.distance {
+ Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => {
+ assert_eq!(*val, 10.0);
+ }
+ _ => panic!("Expected scalar distance value"),
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_multiple_batches() -> Result<()> {
+ let num_batches = 5;
+ let spill_file = create_spill_file_with_batches(num_batches).await?;
+ let mut stream =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+ let mut batches_read = 0;
+ while let Some(batch_result) = stream.next().await {
+ let batch = batch_result?;
+ assert_eq!(batch.num_rows(), 3);
+
+ // Verify the ID starts at the expected value
+ let id_array = batch
+ .batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let expected_start_id = (batches_read * 3) as i32;
+ assert_eq!(id_array.value(0), expected_start_id);
+
+ batches_read += 1;
+ }
+
+ assert_eq!(batches_read, num_batches);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_external_stream_poll_after_completion() -> Result<()> {
+ let spill_file = create_spill_file_with_batches(1).await?;
+ let mut stream =
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+ // Read the batch
+ let _ = stream.next().await.unwrap()?;
+
+ // Should return None
+ assert!(stream.next().await.is_none());
+
+ // Polling again should still return None
+ assert!(stream.next().await.is_none());
+ assert!(stream.next().await.is_none());
+
+ Ok(())
+ }
+}
diff --git a/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
new file mode 100644
index 00000000..9d5dd8a8
--- /dev/null
+++ b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
@@ -0,0 +1,806 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::Float64Array;
+use arrow_array::{Array, RecordBatch, StructArray};
+use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_execution::{disk_manager::RefCountedTempFile,
runtime_env::RuntimeEnv};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::SpillMetrics;
+use sedona_common::sedona_internal_err;
+use sedona_schema::datatypes::SedonaType;
+
+use crate::{
+ evaluated_batch::EvaluatedBatch,
+ operand_evaluator::EvaluatedGeometryArray,
+ utils::spill::{RecordBatchSpillReader, RecordBatchSpillWriter},
+};
+
+/// Writer for spilling evaluated batches to disk
+pub struct EvaluatedBatchSpillWriter {
+ /// The temporary spill file being written to
+ inner: RecordBatchSpillWriter,
+
+ /// Schema of the spilled record batches. It is augmented from the schema
of original record batches
+ /// The spill_schema has 4 fields:
+ /// * `data`: StructArray containing the original record batch columns
+ /// * `geom`: geometry array in storage format
+ /// * `dist`: distance field
+ spill_schema: Schema,
+ /// Inner fields of the "data" StructArray in the spilled record batches
+ data_inner_fields: Fields,
+}
+
+const SPILL_FIELD_DATA_INDEX: usize = 0;
+const SPILL_FIELD_GEOM_INDEX: usize = 1;
+const SPILL_FIELD_DIST_INDEX: usize = 2;
+
+impl EvaluatedBatchSpillWriter {
+ /// Create a new SpillWriter
+ pub fn try_new(
+ env: Arc<RuntimeEnv>,
+ schema: SchemaRef,
+ sedona_type: &SedonaType,
+ request_description: &str,
+ compression: SpillCompression,
+ metrics: SpillMetrics,
+ batch_size_threshold: Option<usize>,
+ ) -> Result<Self> {
+ // Construct schema of record batches to be written. The written
batches are augmented from the original record batches.
+ let data_inner_fields = schema.fields().clone();
+ let data_struct_field =
+ Field::new("data", DataType::Struct(data_inner_fields.clone()),
false);
+ let geom_field = sedona_type.to_storage_field("geom", true)?;
+ let dist_field = Field::new("dist", DataType::Float64, true);
+ let spill_schema = Schema::new(vec![data_struct_field, geom_field,
dist_field]);
+
+ // Create spill file
+ let inner = RecordBatchSpillWriter::try_new(
+ env,
+ Arc::new(spill_schema.clone()),
+ request_description,
+ compression,
+ metrics,
+ batch_size_threshold,
+ )?;
+
+ Ok(Self {
+ inner,
+ spill_schema,
+ data_inner_fields,
+ })
+ }
+
+ /// Append an EvaluatedBatch to the spill file
+ pub fn append(&mut self, evaluated_batch: &EvaluatedBatch) -> Result<()> {
+ let record_batch = self.spilled_record_batch(evaluated_batch)?;
+
+ // Splitting/compaction and spill bytes/rows metrics are handled by
`RecordBatchSpillWriter`.
+ self.inner.write_batch(record_batch)?;
+ Ok(())
+ }
+
+ /// Finish writing and return the temporary file
+ pub fn finish(self) -> Result<RefCountedTempFile> {
+ self.inner.finish()
+ }
+
+ fn spilled_record_batch(&self, evaluated_batch: &EvaluatedBatch) ->
Result<RecordBatch> {
+ let num_rows = evaluated_batch.num_rows();
+
+ // Store the original data batch into a StructArray
+ let data_batch = &evaluated_batch.batch;
+ let data_arrays = data_batch.columns().to_vec();
+ let data_struct_array =
+ StructArray::try_new(self.data_inner_fields.clone(), data_arrays,
None)?;
+
+ // Store dist into a Float64Array
+ let mut dist_builder =
arrow::array::Float64Builder::with_capacity(num_rows);
+ let geom_array = &evaluated_batch.geom_array;
+ match &geom_array.distance {
+ Some(ColumnarValue::Scalar(scalar)) => match scalar {
+ ScalarValue::Float64(dist_value) => {
+ for _ in 0..num_rows {
+ dist_builder.append_option(*dist_value);
+ }
+ }
+ _ => {
+ return sedona_internal_err!("Distance columnar value is
not a Float64Array");
+ }
+ },
+ Some(ColumnarValue::Array(array)) => {
+ let float_array = array
+ .as_any()
+ .downcast_ref::<arrow::array::Float64Array>()
+ .unwrap();
+ dist_builder.append_array(float_array);
+ }
+ None => {
+ for _ in 0..num_rows {
+ dist_builder.append_null();
+ }
+ }
+ }
+ let dist_array = dist_builder.finish();
+
+ // Assemble the final spilled RecordBatch
+ let columns = vec![
+ Arc::new(data_struct_array) as Arc<dyn arrow::array::Array>,
+ Arc::clone(&geom_array.geometry_array),
+ Arc::new(dist_array) as Arc<dyn arrow::array::Array>,
+ ];
+ let spilled_record_batch =
+ RecordBatch::try_new(Arc::new(self.spill_schema.clone()),
columns)?;
+ Ok(spilled_record_batch)
+ }
+}
+/// Reader for reading spilled evaluated batches from disk
+pub struct EvaluatedBatchSpillReader {
+ inner: RecordBatchSpillReader,
+}
+impl EvaluatedBatchSpillReader {
+ /// Create a new SpillReader
+ pub fn try_new(temp_file: &RefCountedTempFile) -> Result<Self> {
+ Ok(Self {
+ inner: RecordBatchSpillReader::try_new(temp_file)?,
+ })
+ }
+
+ /// Get the schema of the spilled data
+ pub fn schema(&self) -> SchemaRef {
+ self.inner.schema()
+ }
+
+ /// Read the next EvaluatedBatch from the spill file
+ #[allow(unused)]
+ pub fn next_batch(&mut self) -> Option<Result<EvaluatedBatch>> {
+ self.next_raw_batch()
+ .map(|record_batch|
record_batch.and_then(spilled_batch_to_evaluated_batch))
+ }
+
+ /// Read the next raw RecordBatch from the spill file
+ pub fn next_raw_batch(&mut self) -> Option<Result<RecordBatch>> {
+ self.inner.next_batch()
+ }
+}
+
+pub(crate) fn spilled_batch_to_evaluated_batch(
+ record_batch: RecordBatch,
+) -> Result<EvaluatedBatch> {
+ // Extract the data struct array (column 0) and convert back to the
original RecordBatch
+ let data_array = record_batch
+ .column(SPILL_FIELD_DATA_INDEX)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or_else(|| {
+ DataFusionError::Internal("Expected data column to be a
StructArray".to_string())
+ })?;
+
+ let data_schema = Arc::new(Schema::new(match data_array.data_type() {
+ DataType::Struct(fields) => fields.clone(),
+ _ => {
+ return Err(DataFusionError::Internal(
+ "Expected data column to have Struct data type".to_string(),
+ ))
+ }
+ }));
+
+ let data_columns = (0..data_array.num_columns())
+ .map(|i| Arc::clone(data_array.column(i)))
+ .collect::<Vec<_>>();
+
+ let batch = RecordBatch::try_new(data_schema, data_columns)?;
+
+ // Extract the geometry array (column 1)
+ let geom_array = Arc::clone(record_batch.column(SPILL_FIELD_GEOM_INDEX));
+
+ // Determine the SedonaType from the geometry field in the record batch
schema
+ let schema = record_batch.schema();
+ let geom_field = schema.field(SPILL_FIELD_GEOM_INDEX);
+ let sedona_type = SedonaType::from_storage_field(geom_field)?;
+
+ // Extract the distance array (column 3) and convert back to ColumnarValue
+ let dist_array = record_batch
+ .column(SPILL_FIELD_DIST_INDEX)
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| {
+ DataFusionError::Internal("Expected dist column to be
Float64Array".to_string())
+ })?;
+
+ let distance = if !dist_array.is_empty() {
+ // Check if all values are the same (scalar case)
+ let first_value = if dist_array.is_null(0) {
+ None
+ } else {
+ Some(dist_array.value(0))
+ };
+
+ let all_same = (1..dist_array.len()).all(|i| {
+ let current_value = if dist_array.is_null(i) {
+ None
+ } else {
+ Some(dist_array.value(i))
+ };
+ current_value == first_value
+ });
+
+ if all_same {
+ Some(ColumnarValue::Scalar(ScalarValue::Float64(first_value)))
+ } else {
+ Some(ColumnarValue::Array(Arc::clone(
+ record_batch.column(SPILL_FIELD_DIST_INDEX),
+ )))
+ }
+ } else {
+ None
+ };
+
+ // Create EvaluatedGeometryArray
+ let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
+ geom_array.distance = distance;
+
+ Ok(EvaluatedBatch { batch, geom_array })
+}
+
+pub(crate) fn spilled_schema_to_evaluated_schema(spilled_schema: &SchemaRef)
-> Result<SchemaRef> {
+ if spilled_schema.fields().is_empty() {
+ return Ok(SchemaRef::new(Schema::empty()));
+ }
+
+ let data_field = spilled_schema.field(SPILL_FIELD_DATA_INDEX);
+ let inner_fields = match data_field.data_type() {
+ DataType::Struct(fields) => fields.clone(),
+ _ => {
+ return sedona_internal_err!("Invalid schema of spilled file:
{:?}", spilled_schema);
+ }
+ };
+ Ok(SchemaRef::new(Schema::new(inner_fields)))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::utils::arrow_utils::get_record_batch_memory_size;
+ use arrow_array::{ArrayRef, BinaryArray, Int32Array, StringArray};
+ use arrow_schema::{DataType, Field, Schema};
+ use datafusion_common::Result;
+ use datafusion_execution::runtime_env::RuntimeEnv;
+ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+ use sedona_schema::datatypes::WKB_GEOMETRY;
+ use std::sync::Arc;
+
+ fn create_test_runtime_env() -> Result<Arc<RuntimeEnv>> {
+ Ok(Arc::new(RuntimeEnv::default()))
+ }
+
+ fn create_test_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]))
+ }
+
+ fn create_test_record_batch() -> Result<RecordBatch> {
+ let schema = create_test_schema();
+ let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let name_array = Arc::new(StringArray::from(vec![Some("Alice"),
Some("Bob"), None]));
+ RecordBatch::try_new(schema, vec![id_array, name_array]).map_err(|e|
e.into())
+ }
+
+ fn create_test_geometry_array() -> Result<(ArrayRef, SedonaType)> {
+ // Create WKB encoded points (simple binary data for testing)
+ // WKB for POINT (1 2): 01 01000000 0000000000000000F03F
0000000000000040
+ let point1_wkb: Vec<u8> = vec![
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64,
+ ];
+ let point2_wkb: Vec<u8> = vec![
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 16, 64,
+ ];
+ let point3_wkb: Vec<u8> = vec![
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64,
+ ];
+
+ let sedona_type = WKB_GEOMETRY;
+ let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![
+ Some(point1_wkb.as_slice()),
+ Some(point2_wkb.as_slice()),
+ Some(point3_wkb.as_slice()),
+ ]));
+
+ Ok((geom_array, sedona_type))
+ }
+
+ fn create_test_evaluated_batch() -> Result<EvaluatedBatch> {
+ let batch = create_test_record_batch()?;
+ let (geom_array, sedona_type) = create_test_geometry_array()?;
+ let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
+
+ // Add distance as a scalar value
+ geom_array.distance =
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+
+ Ok(EvaluatedBatch { batch, geom_array })
+ }
+
+ fn create_test_evaluated_batch_with_array_distance() ->
Result<EvaluatedBatch> {
+ let batch = create_test_record_batch()?;
+ let (geom_array, sedona_type) = create_test_geometry_array()?;
+ let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
+
+ // Add distance as an array value
+ let dist_array = Arc::new(Float64Array::from(vec![Some(1.0),
Some(2.0), Some(3.0)]));
+ geom_array.distance = Some(ColumnarValue::Array(dist_array));
+
+ Ok(EvaluatedBatch { batch, geom_array })
+ }
+
+ fn create_test_evaluated_batch_with_nulls() -> Result<EvaluatedBatch> {
+ let schema = create_test_schema();
+ let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let name_array = Arc::new(StringArray::from(vec![
+ Some("Alice"),
+ None,
+ Some("Charlie"),
+ ]));
+ let batch = RecordBatch::try_new(schema, vec![id_array, name_array])
+ .map_err(DataFusionError::from)?;
+
+ // Create geometry array with null in the middle
+ let point1_wkb: Vec<u8> = vec![
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64,
+ ];
+ let point3_wkb: Vec<u8> = vec![
+ 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64,
+ ];
+
+ let sedona_type = WKB_GEOMETRY;
+ let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![
+ Some(point1_wkb.as_slice()),
+ None,
+ Some(point3_wkb.as_slice()),
+ ]));
+
+ let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
+
+ // Add distance with nulls
+ let dist_array = Arc::new(Float64Array::from(vec![Some(1.0), None,
Some(3.0)]));
+ geom_array.distance = Some(ColumnarValue::Array(dist_array));
+
+ Ok(EvaluatedBatch { batch, geom_array })
+ }
+
+ #[test]
+ fn test_spill_writer_creation() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ // Verify the spill schema has the expected structure
+ assert_eq!(writer.spill_schema.fields().len(), 3);
+ assert_eq!(
+ writer.spill_schema.field(SPILL_FIELD_DATA_INDEX).name(),
+ "data"
+ );
+ assert_eq!(
+ writer.spill_schema.field(SPILL_FIELD_GEOM_INDEX).name(),
+ "geom"
+ );
+ assert_eq!(
+ writer.spill_schema.field(SPILL_FIELD_DIST_INDEX).name(),
+ "dist"
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_write_and_read_basic() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ let evaluated_batch = create_test_evaluated_batch()?;
+ let original_num_rows = evaluated_batch.num_rows();
+
+ writer.append(&evaluated_batch)?;
+ let temp_file = writer.finish()?;
+
+ // Read back the spilled data
+ let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+ let read_batch_result = reader.next_batch();
+
+ assert!(read_batch_result.is_some());
+ let read_batch = read_batch_result.unwrap()?;
+
+ // Verify the data
+ assert_eq!(read_batch.num_rows(), original_num_rows);
+ assert_eq!(read_batch.batch.num_columns(), 2); // id and name columns
+
+ // Verify that there are no more batches
+ assert!(reader.next_batch().is_none());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_write_and_read_with_array_distance() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ let evaluated_batch =
create_test_evaluated_batch_with_array_distance()?;
+ writer.append(&evaluated_batch)?;
+ let temp_file = writer.finish()?;
+
+ // Read back the spilled data
+ let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+ let read_batch = reader.next_batch().unwrap()?;
+
+ // Verify distance is read back as array
+ match &read_batch.geom_array.distance {
+ Some(ColumnarValue::Array(array)) => {
+ let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(float_array.len(), 3);
+ assert_eq!(float_array.value(0), 1.0);
+ assert_eq!(float_array.value(1), 2.0);
+ assert_eq!(float_array.value(2), 3.0);
+ }
+ _ => panic!("Expected distance to be an array"),
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_write_and_read_with_nulls() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ let evaluated_batch = create_test_evaluated_batch_with_nulls()?;
+ writer.append(&evaluated_batch)?;
+ let temp_file = writer.finish()?;
+
+ // Read back the spilled data
+ let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+ let read_batch = reader.next_batch().unwrap()?;
+
+ // Verify nulls are preserved
+ assert_eq!(read_batch.num_rows(), 3);
+ assert!(read_batch.geom_array.rects[1].is_none()); // Null geometry
+
+ // Verify distance nulls
+ match &read_batch.geom_array.distance {
+ Some(ColumnarValue::Array(array)) => {
+ let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert!(float_array.is_valid(0));
+ assert!(float_array.is_null(1));
+ assert!(float_array.is_valid(2));
+ }
+ _ => panic!("Expected distance to be an array"),
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_multiple_batches() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ // Write multiple batches
+ let batch1 = create_test_evaluated_batch()?;
+ let batch2 = create_test_evaluated_batch_with_array_distance()?;
+ let batch3 = create_test_evaluated_batch_with_nulls()?;
+
+ writer.append(&batch1)?;
+ writer.append(&batch2)?;
+ writer.append(&batch3)?;
+ let temp_file = writer.finish()?;
+
+ // Read back all batches
+ let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+
+ let read_batch1 = reader.next_batch().unwrap()?;
+ assert_eq!(read_batch1.num_rows(), 3);
+
+ let read_batch2 = reader.next_batch().unwrap()?;
+ assert_eq!(read_batch2.num_rows(), 3);
+
+ let read_batch3 = reader.next_batch().unwrap()?;
+ assert_eq!(read_batch3.num_rows(), 3);
+
+ // Verify no more batches
+ assert!(reader.next_batch().is_none());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_metrics_updated() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics.clone(),
+ None,
+ )?;
+
+ let evaluated_batch = create_test_evaluated_batch()?;
+ writer.append(&evaluated_batch)?;
+ writer.finish()?;
+
+ // Verify spill metrics were updated
+ assert!(metrics.spilled_rows.value() > 0);
+ assert!(metrics.spilled_bytes.value() > 0);
+
+ // Verify spill file count was updated
+ assert_eq!(metrics.spill_file_count.value(), 1);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_rect_preservation() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ let evaluated_batch = create_test_evaluated_batch()?;
+ let original_rects = evaluated_batch.rects().clone();
+
+ writer.append(&evaluated_batch)?;
+ let temp_file = writer.finish()?;
+
+ // Read back and verify rects
+ let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+ let read_batch = reader.next_batch().unwrap()?;
+
+ assert_eq!(read_batch.rects().len(), original_rects.len());
+ for (original, read) in
original_rects.iter().zip(read_batch.rects().iter()) {
+ match (original, read) {
+ (Some(orig_rect), Some(read_rect)) => {
+ assert_eq!(orig_rect.min().x, read_rect.min().x);
+ assert_eq!(orig_rect.min().y, read_rect.min().y);
+ assert_eq!(orig_rect.max().x, read_rect.max().x);
+ assert_eq!(orig_rect.max().y, read_rect.max().y);
+ }
+ (None, None) => {}
+ _ => panic!("Rect mismatch between original and read"),
+ }
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_scalar_distance_preserved() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ let evaluated_batch = create_test_evaluated_batch()?;
+ writer.append(&evaluated_batch)?;
+ let temp_file = writer.finish()?;
+
+ // Read back and verify scalar distance is preserved
+ let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+ let read_batch = reader.next_batch().unwrap()?;
+
+ match &read_batch.geom_array.distance {
+ Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => {
+ assert_eq!(*val, 10.0);
+ }
+ _ => panic!("Expected scalar distance value"),
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_empty_batch() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema.clone(),
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ None,
+ )?;
+
+ // Create an empty batch
+ let id_array = Arc::new(Int32Array::from(Vec::<i32>::new()));
+ let name_array =
Arc::new(StringArray::from(Vec::<Option<&str>>::new()));
+ let empty_batch = RecordBatch::try_new(schema, vec![id_array,
name_array])
+ .map_err(DataFusionError::from)?;
+
+ let geom_array: ArrayRef =
Arc::new(BinaryArray::from(Vec::<Option<&[u8]>>::new()));
+ let geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
+
+ let evaluated_batch = EvaluatedBatch {
+ batch: empty_batch,
+ geom_array,
+ };
+
+ writer.append(&evaluated_batch)?;
+ let temp_file = writer.finish()?;
+
+ // Read back and verify
+ let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+ let read_batch = reader.next_batch().unwrap()?;
+ assert_eq!(read_batch.num_rows(), 0);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_spill_batch_splitting() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let schema = create_test_schema();
+ let sedona_type = WKB_GEOMETRY;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ // Create a batch
+ let evaluated_batch = create_test_evaluated_batch()?;
+ let batch_size = get_record_batch_memory_size(&evaluated_batch.batch)?;
+
+ // Set threshold to be smaller than batch size, so it splits into at
least 2 parts
+ let threshold = batch_size / 2;
+
+ let mut writer = EvaluatedBatchSpillWriter::try_new(
+ env,
+ schema,
+ &sedona_type,
+ "test_spill",
+ SpillCompression::Uncompressed,
+ metrics,
+ Some(threshold),
+ )?;
+
+ writer.append(&evaluated_batch)?;
+ let temp_file = writer.finish()?;
+
+ // Read back the spilled data
+ let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+
+ // We expect multiple batches
+ let mut num_batches = 0;
+ let mut total_rows = 0;
+ while let Some(batch_result) = reader.next_batch() {
+ let batch = batch_result?;
+ num_batches += 1;
+ total_rows += batch.num_rows();
+ }
+
+ assert!(
+ num_batches > 1,
+ "Batch should have been split into multiple batches"
+ );
+ assert_eq!(
+ total_rows,
+ evaluated_batch.num_rows(),
+ "Total rows should match"
+ );
+
+ Ok(())
+ }
+}
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index a1643834..9230aaf4 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -140,7 +140,7 @@ impl BuildSideBatchesCollector {
self.collect_all_concurrently(streams, reservations, metrics_vec)
.await
} else {
- self.collect_all_sequential(streams, reservations, metrics_vec)
+ self.collect_all_sequentially(streams, reservations, metrics_vec)
.await
}
}
@@ -187,7 +187,7 @@ impl BuildSideBatchesCollector {
Ok(partitions.into_iter().map(|v| v.unwrap()).collect())
}
- async fn collect_all_sequential(
+ async fn collect_all_sequentially(
&self,
streams: Vec<SendableRecordBatchStream>,
reservations: Vec<MemoryReservation>,
diff --git a/rust/sedona-spatial-join/src/lib.rs
b/rust/sedona-spatial-join/src/lib.rs
index 0c99b91b..94af3f22 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-spatial-join/src/lib.rs
@@ -16,7 +16,7 @@
// under the License.
mod build_index;
-mod evaluated_batch;
+pub mod evaluated_batch;
pub mod exec;
mod index;
pub mod operand_evaluator;
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index b76e9116..8b431396 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -90,7 +90,7 @@ pub(crate) fn create_operand_evaluator(
}
/// Result of evaluating a geometry batch.
-pub(crate) struct EvaluatedGeometryArray {
+pub struct EvaluatedGeometryArray {
/// The array of geometries produced by evaluating the geometry expression.
pub geometry_array: ArrayRef,
/// The rects of the geometries in the geometry array. The length of this
array is equal to the number of geometries.
diff --git a/rust/sedona-spatial-join/src/utils.rs
b/rust/sedona-spatial-join/src/utils.rs
index 6db8f85a..3f53df47 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join/src/utils.rs
@@ -21,3 +21,4 @@ pub(crate) mod concurrent_reservation;
pub(crate) mod init_once_array;
pub(crate) mod join_utils;
pub(crate) mod once_fut;
+pub(crate) mod spill;
diff --git a/rust/sedona-spatial-join/src/utils/arrow_utils.rs
b/rust/sedona-spatial-join/src/utils/arrow_utils.rs
index 367568fc..258b1f8a 100644
--- a/rust/sedona-spatial-join/src/utils/arrow_utils.rs
+++ b/rust/sedona-spatial-join/src/utils/arrow_utils.rs
@@ -21,10 +21,21 @@ use arrow::array::{Array, ArrayData, BinaryViewArray,
ListArray, RecordBatch, St
use arrow_array::make_array;
use arrow_array::ArrayRef;
use arrow_array::StructArray;
+use arrow_schema::SchemaRef;
use arrow_schema::{ArrowError, DataType};
use datafusion_common::Result;
use sedona_common::sedona_internal_err;
+/// Checks if the schema contains any view types (Utf8View or BinaryView).
Batches
+/// with view types may need special handling (e.g. compaction) before spilling
+/// or holding in memory for extended periods.
+pub(crate) fn schema_contains_view_types(schema: &SchemaRef) -> bool {
+ schema
+ .flattened_fields()
+ .iter()
+ .any(|field| matches!(field.data_type(), DataType::Utf8View |
DataType::BinaryView))
+}
+
/// Reconstruct `batch` to organize the payload buffers of each
`StringViewArray` and
/// `BinaryViewArray` in sequential order by calling `gc()` on them.
///
@@ -246,9 +257,55 @@ mod tests {
use arrow_array::{
BinaryViewArray, BooleanArray, ListArray, StringArray,
StringViewArray, StructArray,
};
- use arrow_schema::{DataType, Field, Schema};
+ use arrow_schema::{DataType, Field, Fields, Schema};
use std::sync::Arc;
+ fn make_schema(fields: Vec<Field>) -> SchemaRef {
+ Arc::new(Schema::new(fields))
+ }
+
+ #[test]
+ fn test_schema_contains_view_types_top_level() {
+ let schema_ref = make_schema(vec![
+ Field::new("a", DataType::Utf8View, true),
+ Field::new("b", DataType::BinaryView, true),
+ ]);
+
+ assert!(schema_contains_view_types(&schema_ref));
+
+ // Similar shape but without view types
+ let schema_no_view = make_schema(vec![
+ Field::new("a", DataType::Utf8, true),
+ Field::new("b", DataType::Binary, true),
+ ]);
+ assert!(!schema_contains_view_types(&schema_no_view));
+ }
+
+ #[test]
+ fn test_schema_contains_view_types_nested() {
+ let nested = Field::new(
+ "s",
+ DataType::Struct(Fields::from(vec![Field::new(
+ "v",
+ DataType::Utf8View,
+ true,
+ )])),
+ true,
+ );
+
+ let schema_ref = make_schema(vec![nested]);
+ assert!(schema_contains_view_types(&schema_ref));
+
+ // Nested struct without any view types
+ let nested_no_view = Field::new(
+ "s",
+ DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8,
true)])),
+ true,
+ );
+ let schema_no_view = make_schema(vec![nested_no_view]);
+ assert!(!schema_contains_view_types(&schema_no_view));
+ }
+
#[test]
fn test_string_view_array_memory_size() {
let array = StringViewArray::from(vec![
@@ -523,12 +580,12 @@ mod tests {
let array: ArrayRef = Arc::new(struct_array);
let slice = array.slice(0, 2);
- let before_size = get_array_memory_size(&array).unwrap();
+ let before_size = slice.get_array_memory_size();
let (compacted, mutated) = compact_array(Arc::new(slice)).unwrap();
assert!(mutated);
- let after_size = get_array_memory_size(&compacted).unwrap();
+ let after_size = compacted.get_array_memory_size();
assert!(after_size < before_size);
}
@@ -548,12 +605,12 @@ mod tests {
}
let bv_list: ListArray = bv_list_builder.finish();
let sliced: ArrayRef = Arc::new(bv_list.slice(0, 1));
- let before_size = get_array_memory_size(&sliced).unwrap();
+ let before_size = sliced.get_array_memory_size();
let (compacted, mutated) = compact_array(Arc::clone(&sliced)).unwrap();
assert!(mutated);
- let after_size = get_array_memory_size(&compacted).unwrap();
+ let after_size = compacted.get_array_memory_size();
assert!(after_size <= before_size);
}
diff --git a/rust/sedona-spatial-join/src/utils/spill.rs
b/rust/sedona-spatial-join/src/utils/spill.rs
new file mode 100644
index 00000000..fef585a5
--- /dev/null
+++ b/rust/sedona-spatial-join/src/utils/spill.rs
@@ -0,0 +1,382 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fs::File, io::BufReader, sync::Arc};
+
+use arrow::ipc::{
+ reader::StreamReader,
+ writer::{IpcWriteOptions, StreamWriter},
+};
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::{disk_manager::RefCountedTempFile,
runtime_env::RuntimeEnv};
+use datafusion_physical_plan::metrics::SpillMetrics;
+
+use crate::utils::arrow_utils::{
+ compact_batch, get_record_batch_memory_size, schema_contains_view_types,
+};
+
+/// Generic Arrow IPC stream spill writer for [`RecordBatch`].
+///
+/// Shared between multiple components so spill metrics are updated
consistently.
+pub(crate) struct RecordBatchSpillWriter {
+ in_progress_file: RefCountedTempFile,
+ writer: StreamWriter<File>,
+ metrics: SpillMetrics,
+ batch_size_threshold: Option<usize>,
+ gc_view_arrays: bool,
+}
+
+impl RecordBatchSpillWriter {
+ pub fn try_new(
+ env: Arc<RuntimeEnv>,
+ schema: SchemaRef,
+ request_description: &str,
+ compression: SpillCompression,
+ metrics: SpillMetrics,
+ batch_size_threshold: Option<usize>,
+ ) -> Result<Self> {
+ let in_progress_file =
env.disk_manager.create_tmp_file(request_description)?;
+ let file = File::create(in_progress_file.path())?;
+
+ let mut write_options = IpcWriteOptions::default();
+ write_options =
write_options.try_with_compression(compression.into())?;
+
+ let writer = StreamWriter::try_new_with_options(file, schema.as_ref(),
write_options)?;
+ metrics.spill_file_count.add(1);
+
+ let gc_view_arrays = schema_contains_view_types(&schema);
+
+ Ok(Self {
+ in_progress_file,
+ writer,
+ metrics,
+ batch_size_threshold,
+ gc_view_arrays,
+ })
+ }
+
+ /// Write a record batch to the spill file.
+ ///
+ /// If `batch_size_threshold` is configured and the in-memory size of the
batch exceeds the
+ /// threshold, this will automatically split the batch into smaller slices
and (optionally)
+ /// compact each slice before writing.
+ pub fn write_batch(&mut self, batch: RecordBatch) -> Result<()> {
+ let num_rows = batch.num_rows();
+ if num_rows == 0 {
+ // Preserve "empty batch" semantics: callers may rely on spilling
and reading back a
+ // zero-row batch (e.g. as a sentinel for an empty stream).
+ return self.write_one_batch(batch);
+ }
+
+ let rows_per_split = self.calculate_rows_per_split(&batch, num_rows)?;
+ if rows_per_split < num_rows {
+ let mut offset = 0;
+ while offset < num_rows {
+ let length = std::cmp::min(rows_per_split, num_rows - offset);
+ let slice = batch.slice(offset, length);
+ self.write_one_batch(slice)?;
+ offset += length;
+ }
+ } else {
+ self.write_one_batch(batch)?;
+ }
+ Ok(())
+ }
+
+ fn calculate_rows_per_split(&self, batch: &RecordBatch, num_rows: usize)
-> Result<usize> {
+ let Some(threshold) = self.batch_size_threshold else {
+ return Ok(num_rows);
+ };
+ if threshold == 0 {
+ return Ok(num_rows);
+ }
+
+ let batch_size = get_record_batch_memory_size(batch)?;
+ if batch_size <= threshold {
+ return Ok(num_rows);
+ }
+
+ let num_splits = batch_size.div_ceil(threshold);
+ let rows = num_rows.div_ceil(num_splits);
+ Ok(std::cmp::max(1, rows))
+ }
+
+ fn write_one_batch(&mut self, batch: RecordBatch) -> Result<()> {
+ // Writing record batches containing sparse binary view arrays may
lead to excessive
+ // disk usage and slow read performance later. Compact such batches
before writing.
+ let batch = if self.gc_view_arrays {
+ compact_batch(batch)?
+ } else {
+ batch
+ };
+ self.writer.write(&batch).map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to write RecordBatch to spill file {:?}: {}",
+ self.in_progress_file.path(),
+ e
+ ))
+ })?;
+
+ self.metrics.spilled_rows.add(batch.num_rows());
+ Ok(())
+ }
+
+ pub fn finish(mut self) -> Result<RefCountedTempFile> {
+ self.writer.finish()?;
+
+ let mut in_progress_file = self.in_progress_file;
+ in_progress_file.update_disk_usage()?;
+ let size = in_progress_file.current_disk_usage();
+ self.metrics.spilled_bytes.add(size as usize);
+ Ok(in_progress_file)
+ }
+}
+
+/// Generic Arrow IPC stream spill reader for [`RecordBatch`].
+pub(crate) struct RecordBatchSpillReader {
+ stream_reader: StreamReader<BufReader<File>>,
+}
+
+impl RecordBatchSpillReader {
+ pub fn try_new(temp_file: &RefCountedTempFile) -> Result<Self> {
+ let file = File::open(temp_file.path())?;
+ let mut stream_reader = StreamReader::try_new_buffered(file, None)?;
+
+ // SAFETY: spill writers in this crate strictly follow Arrow IPC
specifications.
+ // Skip redundant validation during read to speed up.
+ unsafe {
+ stream_reader = stream_reader.with_skip_validation(true);
+ }
+
+ Ok(Self { stream_reader })
+ }
+
+ pub fn schema(&self) -> SchemaRef {
+ self.stream_reader.schema()
+ }
+
+ pub fn next_batch(&mut self) -> Option<Result<RecordBatch>> {
+ self.stream_reader
+ .next()
+ .map(|result| result.map_err(|e| e.into()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::builder::BinaryViewBuilder;
+ use arrow_array::{Int32Array, StringArray};
+ use arrow_schema::{DataType, Field, Schema};
+ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+
+ fn create_test_runtime_env() -> Result<Arc<RuntimeEnv>> {
+ Ok(Arc::new(RuntimeEnv::default()))
+ }
+
+ fn create_test_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ ]))
+ }
+
+ fn create_test_record_batch(num_rows: usize) -> RecordBatch {
+ let ids: Int32Array = (0..num_rows as i32).collect();
+
+ let names: StringArray = (0..num_rows)
+ .map(|i| {
+ if i % 3 == 0 {
+ None
+ } else {
+ Some(format!("name_{i}"))
+ }
+ })
+ .collect();
+
+ RecordBatch::try_new(create_test_schema(), vec![Arc::new(ids),
Arc::new(names)]).unwrap()
+ }
+
+ fn create_test_binary_view_batch(num_rows: usize, value_len: usize) ->
RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "payload",
+ DataType::BinaryView,
+ false,
+ )]));
+
+ let mut builder = BinaryViewBuilder::new();
+ for i in 0..num_rows {
+ let byte = b'a' + (i % 26) as u8;
+ let bytes = vec![byte; value_len];
+ builder.append_value(bytes.as_slice());
+ }
+
+ let array = Arc::new(builder.finish());
+ RecordBatch::try_new(schema, vec![array]).unwrap()
+ }
+
+ #[test]
+ fn test_record_batch_spill_empty_batch_round_trip() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let schema = create_test_schema();
+ let mut writer = RecordBatchSpillWriter::try_new(
+ env,
+ schema.clone(),
+ "test_record_batch_spill_empty",
+ SpillCompression::Uncompressed,
+ metrics.clone(),
+ None,
+ )?;
+
+ let empty = create_test_record_batch(0);
+ writer.write_batch(empty)?;
+ let file = writer.finish()?;
+
+ assert_eq!(metrics.spill_file_count.value(), 1);
+ assert_eq!(metrics.spilled_rows.value(), 0);
+
+ let mut reader = RecordBatchSpillReader::try_new(&file)?;
+ let read = reader.next_batch().unwrap()?;
+ assert_eq!(read.num_rows(), 0);
+ assert_eq!(read.schema(), schema);
+ assert!(reader.next_batch().is_none());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_record_batch_spill_round_trip() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let schema = create_test_schema();
+ let mut writer = RecordBatchSpillWriter::try_new(
+ env,
+ schema.clone(),
+ "test_record_batch_spill",
+ SpillCompression::Uncompressed,
+ metrics.clone(),
+ None,
+ )?;
+
+ let batch1 = create_test_record_batch(5);
+ let batch2 = create_test_record_batch(3);
+ writer.write_batch(batch1)?;
+ writer.write_batch(batch2)?;
+
+ let file = writer.finish()?;
+
+ assert_eq!(metrics.spill_file_count.value(), 1);
+ assert_eq!(metrics.spilled_rows.value(), 8);
+ assert!(metrics.spilled_bytes.value() > 0);
+
+ let mut reader = RecordBatchSpillReader::try_new(&file)?;
+ assert_eq!(reader.schema(), schema);
+
+ let read1 = reader.next_batch().unwrap()?;
+ assert_eq!(read1.num_rows(), 5);
+ let read2 = reader.next_batch().unwrap()?;
+ assert_eq!(read2.num_rows(), 3);
+ assert!(reader.next_batch().is_none());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_record_batch_spill_auto_splitting() -> Result<()> {
+ let env = create_test_runtime_env()?;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ let schema = create_test_schema();
+ // Force splitting by setting a tiny threshold.
+ let mut writer = RecordBatchSpillWriter::try_new(
+ env,
+ schema.clone(),
+ "test_record_batch_spill_split",
+ SpillCompression::Uncompressed,
+ metrics.clone(),
+ Some(1),
+ )?;
+
+ let batch = create_test_record_batch(10);
+ writer.write_batch(batch)?;
+ let file = writer.finish()?;
+
+ // Rows should reflect the logical input rows, even if internally
split.
+ assert_eq!(metrics.spilled_rows.value(), 10);
+ assert!(metrics.spilled_bytes.value() > 0);
+
+ // Reader should be able to read all rows back across multiple batches.
+ let mut reader = RecordBatchSpillReader::try_new(&file)?;
+ let mut total_rows = 0;
+ while let Some(batch) = reader.next_batch() {
+ total_rows += batch?.num_rows();
+ }
+ assert_eq!(total_rows, 10);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_record_batch_spill_sliced_binary_view_not_excessive() ->
Result<()> {
+ let env = create_test_runtime_env()?;
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = SpillMetrics::new(&metrics_set, 0);
+
+ // Use a long payload so the view-value buffers dominate overhead,
making the
+ // size comparison stable across platforms.
+ const NUM_ROWS: usize = 100;
+ const NUM_SLICES: usize = 10;
+ const VALUE_LEN: usize = 8 * 1024;
+
+ let batch = create_test_binary_view_batch(NUM_ROWS, VALUE_LEN);
+ let batch_size = get_record_batch_memory_size(&batch)?;
+
+ let mut writer = RecordBatchSpillWriter::try_new(
+ env,
+ batch.schema(),
+ "test_record_batch_spill_sliced_binary_view",
+ SpillCompression::Uncompressed,
+ metrics.clone(),
+ None,
+ )?;
+
+ let rows_per_slice = NUM_ROWS / NUM_SLICES;
+ assert_eq!(rows_per_slice * NUM_SLICES, NUM_ROWS);
+ for i in 0..NUM_SLICES {
+ let slice = batch.slice(i * rows_per_slice, rows_per_slice);
+ writer.write_batch(slice)?;
+ }
+
+ let file = writer.finish()?;
+ let spill_size = file.current_disk_usage() as usize;
+ assert!(
+ spill_size <= (batch_size as f64 * 1.2) as usize,
+ "spill file unexpectedly large for sliced BinaryView batch:
spill_size={spill_size}, batch_size={batch_size}"
+ );
+
+ Ok(())
+ }
+}