This is an automated email from the ASF dual-hosted git repository.

kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b6e2e73 feat(rust/sedona-spatial-join) Spill EvaluatedBatch and add 
external evaluated batch stream (#522)
9b6e2e73 is described below

commit 9b6e2e735da9f2c26c25acc29a0d3aee011e2786
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Mon Jan 19 11:53:02 2026 +0800

    feat(rust/sedona-spatial-join) Spill EvaluatedBatch and add external 
evaluated batch stream (#522)
    
    Add spill reader and writer for evaluated batch stream. `EvaluatedBatch` is 
wrapped as a regular `RecordBatch` with the following fields:
    
    * `data`: The original batch in `EvaluatedBatch::batch`
    * `geom`: The value of `EvaluatedBatch::geom_array::geometry_array`
    * `dist`: The value of `EvaluatedBatch::geom_array::distance`
    
    `ExternalEvaluatedBatchStream` implements `EvaluatedBatchStream` and 
produces `EvaluateBatch` from spill files.
    
    Co-authored-by: Dewey Dunnington <[email protected]>
---
 Cargo.lock                                         |   1 +
 Cargo.toml                                         |   1 +
 rust/sedona-spatial-join/Cargo.toml                |  11 +
 .../external_evaluated_batch_stream.rs             | 165 +++++
 .../bench/evaluated_batch/spill.rs                 | 210 ++++++
 rust/sedona-spatial-join/src/evaluated_batch.rs    |   5 +-
 .../src/evaluated_batch/evaluated_batch_stream.rs  |   1 +
 .../evaluated_batch_stream/evaluate.rs             |  66 +-
 .../evaluated_batch_stream/external.rs             | 638 ++++++++++++++++
 .../src/evaluated_batch/spill.rs                   | 806 +++++++++++++++++++++
 .../src/index/build_side_collector.rs              |   4 +-
 rust/sedona-spatial-join/src/lib.rs                |   2 +-
 rust/sedona-spatial-join/src/operand_evaluator.rs  |   2 +-
 rust/sedona-spatial-join/src/utils.rs              |   1 +
 rust/sedona-spatial-join/src/utils/arrow_utils.rs  |  67 +-
 rust/sedona-spatial-join/src/utils/spill.rs        | 382 ++++++++++
 16 files changed, 2286 insertions(+), 76 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 46f9ff1b..f99ff5ce 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5404,6 +5404,7 @@ dependencies = [
  "geos",
  "once_cell",
  "parking_lot",
+ "pin-project-lite",
  "rand",
  "rstest",
  "sedona-common",
diff --git a/Cargo.toml b/Cargo.toml
index 99e084e9..6c20d23d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -92,6 +92,7 @@ dirs = "6.0.0"
 env_logger = "0.11"
 fastrand = "2.0"
 futures = "0.3"
+pin-project-lite = "0.2"
 glam = "0.30.10"
 object_store = { version = "0.12.4", default-features = false }
 float_next_after = "1"
diff --git a/rust/sedona-spatial-join/Cargo.toml 
b/rust/sedona-spatial-join/Cargo.toml
index b4c8f630..9831c59b 100644
--- a/rust/sedona-spatial-join/Cargo.toml
+++ b/rust/sedona-spatial-join/Cargo.toml
@@ -45,6 +45,7 @@ datafusion-physical-plan = { workspace = true }
 datafusion-execution = { workspace = true }
 datafusion-common-runtime = { workspace = true }
 futures = { workspace = true }
+pin-project-lite = { workspace = true }
 once_cell = { workspace = true }
 parking_lot = { workspace = true }
 geo = { workspace = true }
@@ -89,3 +90,13 @@ harness = false
 name = "flat"
 path = "bench/partitioning/flat.rs"
 harness = false
+
+[[bench]]
+name = "evaluated_batch_spill"
+path = "bench/evaluated_batch/spill.rs"
+harness = false
+
+[[bench]]
+name = "external_evaluated_batch_stream"
+path = "bench/evaluated_batch/external_evaluated_batch_stream.rs"
+harness = false
diff --git 
a/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
 
b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
new file mode 100644
index 00000000..950502be
--- /dev/null
+++ 
b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hint::black_box;
+use std::sync::Arc;
+
+use arrow_array::{Int32Array, RecordBatch, StringArray};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, 
Throughput};
+use datafusion::config::SpillCompression;
+use datafusion_common::ScalarValue;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
+use futures::StreamExt;
+use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY, WKB_VIEW_GEOMETRY};
+use 
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::external::ExternalEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::spill::EvaluatedBatchSpillWriter;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_testing::create::create_array_storage;
+
+const ROWS: usize = 8192;
+const BATCHES_PER_FILE: usize = 64;
+
+fn make_schema() -> SchemaRef {
+    Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+    ]))
+}
+
+fn make_evaluated_batch(num_rows: usize, sedona_type: &SedonaType) -> 
EvaluatedBatch {
+    let schema = make_schema();
+    let ids: Vec<i32> = (0..num_rows).map(|v| v as i32).collect();
+    let id_array = Arc::new(Int32Array::from(ids));
+    let name_array = Arc::new(StringArray::from(vec![Some("Alice"); 
num_rows]));
+    let batch = RecordBatch::try_new(schema, vec![id_array, name_array])
+        .expect("failed to build record batch for benchmark");
+
+    // Use sedona-testing helpers so this benchmark stays focused on spill + 
stream I/O.
+    // This builds either a Binary (WKB_GEOMETRY) or BinaryView 
(WKB_VIEW_GEOMETRY) array.
+    let wkt_values = vec![Some("POINT (0 0)"); num_rows];
+    let geom_array = create_array_storage(&wkt_values, sedona_type);
+
+    let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
sedona_type)
+        .expect("failed to build geometry array for benchmark");
+
+    geom_array.distance = 
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+
+    EvaluatedBatch { batch, geom_array }
+}
+
+fn write_spill_file(
+    env: Arc<RuntimeEnv>,
+    schema: SchemaRef,
+    metrics_set: &ExecutionPlanMetricsSet,
+    sedona_type: &SedonaType,
+    compression: SpillCompression,
+    evaluated_batch: &EvaluatedBatch,
+) -> Arc<datafusion_execution::disk_manager::RefCountedTempFile> {
+    let metrics = SpillMetrics::new(metrics_set, 0);
+    let mut writer = EvaluatedBatchSpillWriter::try_new(
+        env,
+        schema,
+        sedona_type,
+        "bench_external_stream",
+        compression,
+        metrics,
+        None,
+    )
+    .expect("failed to create spill writer for benchmark");
+
+    for _ in 0..BATCHES_PER_FILE {
+        writer
+            .append(evaluated_batch)
+            .expect("failed to append batch in benchmark");
+    }
+
+    Arc::new(writer.finish().expect("failed to finish spill writer"))
+}
+
+fn bench_external_evaluated_batch_stream(c: &mut Criterion) {
+    let env = Arc::new(RuntimeEnv::default());
+    let schema = make_schema();
+    let metrics_set = ExecutionPlanMetricsSet::new();
+
+    let compressions = [
+        ("uncompressed", SpillCompression::Uncompressed),
+        ("lz4", SpillCompression::Lz4Frame),
+    ];
+
+    let runtime = tokio::runtime::Builder::new_current_thread()
+        .build()
+        .expect("failed to create tokio runtime");
+
+    for (label, sedona_type) in [("wkb", WKB_GEOMETRY), ("wkb_view", 
WKB_VIEW_GEOMETRY)] {
+        let evaluated_batch = make_evaluated_batch(ROWS, &sedona_type);
+
+        for (compression_label, compression) in compressions {
+            let spill_file = write_spill_file(
+                Arc::clone(&env),
+                Arc::clone(&schema),
+                &metrics_set,
+                &sedona_type,
+                compression,
+                &evaluated_batch,
+            );
+
+            let mut group = c.benchmark_group(format!(
+                "external_evaluated_batch_stream/{label}/{compression_label}"
+            ));
+            group.throughput(Throughput::Elements((ROWS * BATCHES_PER_FILE) as 
u64));
+
+            group.bench_with_input(
+                BenchmarkId::new(
+                    "external_stream",
+                    format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"),
+                ),
+                &spill_file,
+                |b, file| {
+                    b.iter(|| {
+                        runtime.block_on(async {
+                            let stream =
+                                
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::clone(file))
+                                    .expect("failed to create external 
evaluated batch stream");
+                            futures::pin_mut!(stream);
+
+                            let mut rows = 0usize;
+                            while let Some(batch) = stream.next().await {
+                                let batch =
+                                    batch.expect("failed to read evaluated 
batch from stream");
+                                rows += batch.num_rows();
+                                black_box(batch);
+                            }
+                            black_box(rows);
+                        })
+                    })
+                },
+            );
+
+            group.finish();
+        }
+    }
+}
+
+criterion_group!(
+    external_evaluated_batch_stream,
+    bench_external_evaluated_batch_stream
+);
+criterion_main!(external_evaluated_batch_stream);
diff --git a/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs 
b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
new file mode 100644
index 00000000..b87a6b9c
--- /dev/null
+++ b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hint::black_box;
+use std::sync::Arc;
+
+use arrow_array::{Int32Array, RecordBatch, StringArray};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, 
Throughput};
+use datafusion::config::SpillCompression;
+use datafusion_common::ScalarValue;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
+use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY, WKB_VIEW_GEOMETRY};
+use sedona_spatial_join::evaluated_batch::spill::{
+    EvaluatedBatchSpillReader, EvaluatedBatchSpillWriter,
+};
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_testing::create::create_array_storage;
+
+const ROWS: usize = 8192;
+const BATCHES_PER_FILE: usize = 64;
+
+fn make_schema() -> SchemaRef {
+    Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("name", DataType::Utf8, true),
+    ]))
+}
+
+fn make_evaluated_batch(num_rows: usize, sedona_type: &SedonaType) -> 
EvaluatedBatch {
+    let schema = make_schema();
+    let ids: Vec<i32> = (0..num_rows).map(|v| v as i32).collect();
+    let id_array = Arc::new(Int32Array::from(ids));
+    let name_array = Arc::new(StringArray::from(vec![Some("Alice"); 
num_rows]));
+    let batch = RecordBatch::try_new(schema, vec![id_array, name_array])
+        .expect("failed to build record batch for benchmark");
+
+    // Use sedona-testing helpers so this benchmark stays focused on spill I/O.
+    // This builds either a Binary (WKB_GEOMETRY) or BinaryView 
(WKB_VIEW_GEOMETRY) array.
+    let wkt_values = vec![Some("POINT (0 0)"); num_rows];
+    let geom_array = create_array_storage(&wkt_values, sedona_type);
+
+    let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
sedona_type)
+        .expect("failed to build geometry array for benchmark");
+
+    // Use a scalar distance so the spilled dist column is constant.
+    geom_array.distance = 
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+
+    EvaluatedBatch { batch, geom_array }
+}
+
+fn write_spill_file(
+    env: Arc<RuntimeEnv>,
+    schema: SchemaRef,
+    metrics_set: &ExecutionPlanMetricsSet,
+    sedona_type: &SedonaType,
+    compression: SpillCompression,
+    evaluated_batch: &EvaluatedBatch,
+) -> Arc<datafusion_execution::disk_manager::RefCountedTempFile> {
+    let metrics = SpillMetrics::new(metrics_set, 0);
+    let mut writer = EvaluatedBatchSpillWriter::try_new(
+        env,
+        schema,
+        sedona_type,
+        "bench_spill",
+        compression,
+        metrics,
+        None,
+    )
+    .expect("failed to create spill writer for benchmark");
+
+    for _ in 0..BATCHES_PER_FILE {
+        writer
+            .append(evaluated_batch)
+            .expect("failed to append batch in benchmark");
+    }
+
+    Arc::new(writer.finish().expect("failed to finish spill writer"))
+}
+
+fn bench_spill_writer_and_reader(c: &mut Criterion) {
+    let env = Arc::new(RuntimeEnv::default());
+    let schema = make_schema();
+    let metrics_set = ExecutionPlanMetricsSet::new();
+
+    let compressions = [
+        ("uncompressed", SpillCompression::Uncompressed),
+        ("lz4", SpillCompression::Lz4Frame),
+    ];
+
+    for (label, sedona_type) in [("wkb", WKB_GEOMETRY), ("wkb_view", 
WKB_VIEW_GEOMETRY)] {
+        let evaluated_batch = make_evaluated_batch(ROWS, &sedona_type);
+
+        for (compression_label, compression) in compressions {
+            // Prepare a stable spill file for read benchmarks.
+            let spill_file = write_spill_file(
+                Arc::clone(&env),
+                Arc::clone(&schema),
+                &metrics_set,
+                &sedona_type,
+                compression,
+                &evaluated_batch,
+            );
+
+            let mut group =
+                
c.benchmark_group(format!("evaluated_batch_spill/{label}/{compression_label}"));
+            group.throughput(Throughput::Elements((ROWS * BATCHES_PER_FILE) as 
u64));
+
+            group.bench_with_input(
+                BenchmarkId::new(
+                    "spill_writer",
+                    format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"),
+                ),
+                &evaluated_batch,
+                |b, batch| {
+                    b.iter(|| {
+                        let metrics = SpillMetrics::new(&metrics_set, 0);
+                        let mut writer = EvaluatedBatchSpillWriter::try_new(
+                            Arc::clone(&env),
+                            Arc::clone(&schema),
+                            &sedona_type,
+                            "bench_spill",
+                            compression,
+                            metrics,
+                            None,
+                        )
+                        .expect("failed to create spill writer");
+
+                        for _ in 0..BATCHES_PER_FILE {
+                            writer.append(black_box(batch)).unwrap();
+                        }
+
+                        let file = writer.finish().unwrap();
+                        black_box(file);
+                    })
+                },
+            );
+
+            group.bench_with_input(
+                BenchmarkId::new(
+                    "spill_reader",
+                    format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"),
+                ),
+                &spill_file,
+                |b, file| {
+                    b.iter(|| {
+                        let mut reader =
+                            
EvaluatedBatchSpillReader::try_new(black_box(file.as_ref()))
+                                .expect("failed to create spill reader");
+                        let mut rows = 0usize;
+
+                        while let Some(batch) = reader.next_batch() {
+                            let batch = batch.expect("failed to read evaluated 
batch");
+                            rows += batch.num_rows();
+                            black_box(batch);
+                        }
+
+                        black_box(rows);
+                    })
+                },
+            );
+
+            group.bench_with_input(
+                BenchmarkId::new(
+                    "spill_reader_raw",
+                    format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"),
+                ),
+                &spill_file,
+                |b, file| {
+                    b.iter(|| {
+                        let mut reader =
+                            
EvaluatedBatchSpillReader::try_new(black_box(file.as_ref()))
+                                .expect("failed to create spill reader");
+                        let mut rows = 0usize;
+
+                        while let Some(batch) = reader.next_raw_batch() {
+                            let batch = batch.expect("failed to read record 
batch");
+                            rows += batch.num_rows();
+                            black_box(batch);
+                        }
+
+                        black_box(rows);
+                    })
+                },
+            );
+
+            group.finish();
+        }
+    }
+}
+
+criterion_group!(evaluated_batch_spill, bench_spill_writer_and_reader);
+criterion_main!(evaluated_batch_spill);
diff --git a/rust/sedona-spatial-join/src/evaluated_batch.rs 
b/rust/sedona-spatial-join/src/evaluated_batch.rs
index d44d49ec..aad3f11b 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch.rs
@@ -27,7 +27,7 @@ use crate::{
 
 /// EvaluatedBatch contains the original record batch from the input stream 
and the evaluated
 /// geometry array.
-pub(crate) struct EvaluatedBatch {
+pub struct EvaluatedBatch {
     /// Original record batch polled from the stream
     pub batch: RecordBatch,
     /// Evaluated geometry array, containing the geometry array containing 
geometries to be joined,
@@ -65,4 +65,5 @@ impl EvaluatedBatch {
     }
 }
 
-pub(crate) mod evaluated_batch_stream;
+pub mod evaluated_batch_stream;
+pub mod spill;
diff --git 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
index eb1f855c..c18761d2 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
@@ -39,4 +39,5 @@ pub(crate) trait EvaluatedBatchStream: Stream<Item = 
Result<EvaluatedBatch>> {
 pub(crate) type SendableEvaluatedBatchStream = Pin<Box<dyn 
EvaluatedBatchStream + Send>>;
 
 pub(crate) mod evaluate;
+pub mod external;
 pub(crate) mod in_mem;
diff --git 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
index 0baf3c5f..f6e185c3 100644
--- 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
+++ 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use arrow_array::RecordBatch;
-use arrow_schema::{DataType, SchemaRef};
 use datafusion_common::Result;
 use datafusion_physical_plan::{metrics, SendableRecordBatchStream};
 use futures::{Stream, StreamExt};
@@ -30,7 +29,7 @@ use crate::evaluated_batch::{
     EvaluatedBatch,
 };
 use crate::operand_evaluator::{EvaluatedGeometryArray, OperandEvaluator};
-use crate::utils::arrow_utils::compact_batch;
+use crate::utils::arrow_utils::{compact_batch, schema_contains_view_types};
 
 /// An evaluator that can evaluate geometry expressions on record batches
 /// and produces evaluated geometry arrays.
@@ -86,14 +85,6 @@ impl<E: Evaluator> EvaluateOperandBatchStream<E> {
     }
 }
 
-/// Checks if the schema contains any view types (Utf8View or BinaryView).
-fn schema_contains_view_types(schema: &SchemaRef) -> bool {
-    schema
-        .flattened_fields()
-        .iter()
-        .any(|field| matches!(field.data_type(), DataType::Utf8View | 
DataType::BinaryView))
-}
-
 impl<E: Evaluator> EvaluatedBatchStream for EvaluateOperandBatchStream<E> {
     fn is_external(&self) -> bool {
         false
@@ -160,58 +151,3 @@ pub(crate) fn create_evaluated_probe_stream(
         false,
     ))
 }
-
-#[cfg(test)]
-mod tests {
-    use std::sync::Arc;
-
-    use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
-
-    use super::schema_contains_view_types;
-
-    fn schema(fields: Vec<Field>) -> SchemaRef {
-        Arc::new(Schema::new(fields))
-    }
-
-    #[test]
-    fn test_schema_contains_view_types_top_level() {
-        let schema_ref = schema(vec![
-            Field::new("a", DataType::Utf8View, true),
-            Field::new("b", DataType::BinaryView, true),
-        ]);
-
-        assert!(schema_contains_view_types(&schema_ref));
-
-        // Similar shape but without view types
-        let schema_no_view = schema(vec![
-            Field::new("a", DataType::Utf8, true),
-            Field::new("b", DataType::Binary, true),
-        ]);
-        assert!(!schema_contains_view_types(&schema_no_view));
-    }
-
-    #[test]
-    fn test_schema_contains_view_types_nested() {
-        let nested = Field::new(
-            "s",
-            DataType::Struct(Fields::from(vec![Field::new(
-                "v",
-                DataType::Utf8View,
-                true,
-            )])),
-            true,
-        );
-
-        let schema_ref = schema(vec![nested]);
-        assert!(schema_contains_view_types(&schema_ref));
-
-        // Nested struct without any view types
-        let nested_no_view = Field::new(
-            "s",
-            DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8, 
true)])),
-            true,
-        );
-        let schema_no_view = schema(vec![nested_no_view]);
-        assert!(!schema_contains_view_types(&schema_no_view));
-    }
-}
diff --git 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
new file mode 100644
index 00000000..67d3538e
--- /dev/null
+++ 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
@@ -0,0 +1,638 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::VecDeque,
+    iter,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow_array::RecordBatch;
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::{DataFusionError, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::{
+    disk_manager::RefCountedTempFile, RecordBatchStream, 
SendableRecordBatchStream,
+};
+use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
+use futures::{FutureExt, StreamExt};
+use pin_project_lite::pin_project;
+use sedona_common::sedona_internal_err;
+
+use crate::evaluated_batch::{
+    evaluated_batch_stream::EvaluatedBatchStream,
+    spill::{
+        spilled_batch_to_evaluated_batch, spilled_schema_to_evaluated_schema,
+        EvaluatedBatchSpillReader,
+    },
+    EvaluatedBatch,
+};
+
+const RECORD_BATCH_CHANNEL_CAPACITY: usize = 2;
+
+pin_project! {
+    /// Streams [`EvaluatedBatch`] values read back from on-disk spill files.
+    ///
+    /// This stream is intended for the “spilled” path where batches have been 
written to disk and
+    /// must be read back into memory. It wraps an 
[`ExternalRecordBatchStream`] and uses
+    /// background tasks to prefetch/forward batches so downstream operators 
can process a batch
+    /// while the next one is being loaded.
+    pub struct ExternalEvaluatedBatchStream {
+        #[pin]
+        inner: RecordBatchToEvaluatedStream,
+        schema: SchemaRef,
+    }
+}
+
+enum State {
+    AwaitingFile,
+    Opening(SpawnedTask<Result<EvaluatedBatchSpillReader>>),
+    Reading(SpawnedTask<(EvaluatedBatchSpillReader, 
Option<Result<RecordBatch>>)>),
+    Finished,
+}
+
+impl ExternalEvaluatedBatchStream {
+    /// Creates an external stream from a single spill file.
+    pub fn try_from_spill_file(spill_file: Arc<RefCountedTempFile>) -> 
Result<Self> {
+        let record_stream =
+            
ExternalRecordBatchStream::try_from_spill_files(iter::once(spill_file))?;
+        let evaluated_stream =
+            
RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?;
+        let schema = evaluated_stream.schema();
+        Ok(Self {
+            inner: evaluated_stream,
+            schema,
+        })
+    }
+
+    /// Creates an external stream from multiple spill files.
+    ///
+    /// The stream yields the batches from each file in order. When 
`spill_files` is empty the
+    /// stream is empty (returns `None` immediately) and no schema validation 
is performed.
+    pub fn try_from_spill_files<I>(schema: SchemaRef, spill_files: I) -> 
Result<Self>
+    where
+        I: IntoIterator<Item = Arc<RefCountedTempFile>>,
+    {
+        let record_stream = 
ExternalRecordBatchStream::try_from_spill_files(spill_files)?;
+        if !record_stream.is_empty() {
+            // `ExternalRecordBatchStream` only has a meaningful schema when 
at least one spill
+            // file is provided. In that case, validate that the 
caller-provided evaluated schema
+            // matches what would be derived from the spilled schema.
+            let actual_schema = 
spilled_schema_to_evaluated_schema(&record_stream.schema())?;
+            if schema != actual_schema {
+                return sedona_internal_err!(
+                    "Schema mismatch when creating 
ExternalEvaluatedBatchStream"
+                );
+            }
+        }
+        let evaluated_stream =
+            
RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?;
+        Ok(Self {
+            inner: evaluated_stream,
+            schema,
+        })
+    }
+}
+
+impl EvaluatedBatchStream for ExternalEvaluatedBatchStream {
+    fn is_external(&self) -> bool {
+        true
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+impl futures::Stream for ExternalEvaluatedBatchStream {
+    type Item = Result<EvaluatedBatch>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        self.project().inner.poll_next(cx)
+    }
+}
+
+pin_project! {
+    /// Adapts a [`RecordBatchStream`] containing spilled batches into an 
[`EvaluatedBatch`] stream.
+    ///
+    /// Each incoming `RecordBatch` is decoded via 
[`spilled_batch_to_evaluated_batch`]. This type
+    /// also carries the derived evaluated schema for downstream consumers.
+    struct RecordBatchToEvaluatedStream {
+        #[pin]
+        inner: SendableRecordBatchStream,
+        evaluated_schema: SchemaRef,
+    }
+}
+
+impl RecordBatchToEvaluatedStream {
+    fn try_new(inner: SendableRecordBatchStream) -> Result<Self> {
+        let evaluated_schema = 
spilled_schema_to_evaluated_schema(&inner.schema())?;
+        Ok(Self {
+            inner,
+            evaluated_schema,
+        })
+    }
+
+    /// Buffers `record_stream` by forwarding it through a bounded channel.
+    ///
+    /// This is primarily useful for [`ExternalRecordBatchStream`], where 
producing the next batch
+    /// may involve disk I/O and `spawn_blocking` work. By polling the source 
stream in a spawned
+    /// task, we can overlap “load next batch” with “process current batch”, 
while still applying
+    /// backpressure via [`RECORD_BATCH_CHANNEL_CAPACITY`].
+    ///
+    /// The forwarding task stops when the receiver is dropped or when the 
source stream yields its
+    /// first error.
+    fn try_spawned_evaluated_stream(record_stream: SendableRecordBatchStream) 
-> Result<Self> {
+        let schema = record_stream.schema();
+        let mut builder =
+            RecordBatchReceiverStreamBuilder::new(schema, 
RECORD_BATCH_CHANNEL_CAPACITY);
+        let tx = builder.tx();
+        builder.spawn(async move {
+            let mut record_stream = record_stream;
+            while let Some(batch) = record_stream.next().await {
+                let is_err = batch.is_err();
+                if tx.send(batch).await.is_err() {
+                    break;
+                }
+                if is_err {
+                    break;
+                }
+            }
+            Ok(())
+        });
+
+        let buffered = builder.build();
+        Self::try_new(buffered)
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.evaluated_schema)
+    }
+}
+
+impl futures::Stream for RecordBatchToEvaluatedStream {
+    type Item = Result<EvaluatedBatch>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        match this.inner.as_mut().poll_next(cx) {
+            Poll::Ready(Some(Ok(batch))) => {
+                Poll::Ready(Some(spilled_batch_to_evaluated_batch(batch)))
+            }
+            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
+            Poll::Ready(None) => Poll::Ready(None),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+/// Streams raw [`RecordBatch`] values directly from spill files.
+///
+/// This is the lowest-level “read from disk” stream: it opens each spill 
file, reads the stored
+/// record batches sequentially, and yields them without decoding into 
[`EvaluatedBatch`].
+///
+/// Schema handling:
+/// - If at least one spill file is provided, the stream schema is taken from 
the first file.
+/// - If no files are provided, the schema is empty and the stream terminates 
immediately.
+pub(crate) struct ExternalRecordBatchStream {
+    schema: SchemaRef,
+    state: State,
+    spill_files: VecDeque<Arc<RefCountedTempFile>>,
+    is_empty: bool,
+}
+
+impl ExternalRecordBatchStream {
+    /// Creates a stream over `spill_files`, yielding all batches from each 
file in order.
+    ///
+    /// This function assumes all spill files were written with a compatible 
schema.
+    pub fn try_from_spill_files<I>(spill_files: I) -> Result<Self>
+    where
+        I: IntoIterator<Item = Arc<RefCountedTempFile>>,
+    {
+        let spill_files = spill_files.into_iter().collect::<VecDeque<_>>();
+        let (schema, is_empty) = match spill_files.front() {
+            Some(file) => {
+                let reader = EvaluatedBatchSpillReader::try_new(file)?;
+                (reader.schema(), false)
+            }
+            None => (Arc::new(Schema::empty()), true),
+        };
+        Ok(Self {
+            schema,
+            state: State::AwaitingFile,
+            spill_files,
+            is_empty,
+        })
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.is_empty
+    }
+}
+
+impl RecordBatchStream for ExternalRecordBatchStream {
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+}
+
+impl futures::Stream for ExternalRecordBatchStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        let self_mut = self.get_mut();
+
+        loop {
+            match &mut self_mut.state {
+                State::AwaitingFile => match self_mut.spill_files.pop_front() {
+                    Some(spill_file) => {
+                        let task = SpawnedTask::spawn_blocking(move || {
+                            EvaluatedBatchSpillReader::try_new(&spill_file)
+                        });
+                        self_mut.state = State::Opening(task);
+                    }
+                    None => {
+                        self_mut.state = State::Finished;
+                        return Poll::Ready(None);
+                    }
+                },
+                State::Opening(task) => match 
futures::ready!(task.poll_unpin(cx)) {
+                    Err(e) => {
+                        self_mut.state = State::Finished;
+                        return 
Poll::Ready(Some(Err(DataFusionError::External(Box::new(e)))));
+                    }
+                    Ok(Err(e)) => {
+                        self_mut.state = State::Finished;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                    Ok(Ok(mut spill_reader)) => {
+                        let task = SpawnedTask::spawn_blocking(move || {
+                            let next_batch = spill_reader.next_raw_batch();
+                            (spill_reader, next_batch)
+                        });
+                        self_mut.state = State::Reading(task);
+                    }
+                },
+                State::Reading(task) => match 
futures::ready!(task.poll_unpin(cx)) {
+                    Err(e) => {
+                        self_mut.state = State::Finished;
+                        return 
Poll::Ready(Some(Err(DataFusionError::External(Box::new(e)))));
+                    }
+                    Ok((_, None)) => {
+                        self_mut.state = State::AwaitingFile;
+                        continue;
+                    }
+                    Ok((_, Some(Err(e)))) => {
+                        self_mut.state = State::Finished;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                    Ok((mut spill_reader, Some(Ok(batch)))) => {
+                        let task = SpawnedTask::spawn_blocking(move || {
+                            let next_batch = spill_reader.next_raw_batch();
+                            (spill_reader, next_batch)
+                        });
+                        self_mut.state = State::Reading(task);
+                        return Poll::Ready(Some(Ok(batch)));
+                    }
+                },
+                State::Finished => {
+                    return Poll::Ready(None);
+                }
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::evaluated_batch::spill::EvaluatedBatchSpillWriter;
+    use crate::operand_evaluator::EvaluatedGeometryArray;
+    use arrow_array::{Array, ArrayRef, BinaryArray, Int32Array, RecordBatch, 
StringArray};
+    use arrow_schema::{DataType, Field, Schema, SchemaRef};
+    use datafusion::config::SpillCompression;
+    use datafusion_common::{Result, ScalarValue};
+    use datafusion_execution::runtime_env::RuntimeEnv;
+    use datafusion_expr::ColumnarValue;
+    use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, 
SpillMetrics};
+    use futures::StreamExt;
+    use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY};
+    use std::sync::Arc;
+
+    fn create_test_runtime_env() -> Result<Arc<RuntimeEnv>> {
+        Ok(Arc::new(RuntimeEnv::default()))
+    }
+
+    fn create_test_schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]))
+    }
+
+    fn create_test_record_batch(start_id: i32) -> Result<RecordBatch> {
+        let schema = create_test_schema();
+        let id_array = Arc::new(Int32Array::from(vec![start_id, start_id + 1, 
start_id + 2]));
+        let name_array = Arc::new(StringArray::from(vec![Some("Alice"), 
Some("Bob"), None]));
+        RecordBatch::try_new(schema, vec![id_array, name_array]).map_err(|e| 
e.into())
+    }
+
+    fn create_test_geometry_array() -> Result<(ArrayRef, SedonaType)> {
+        // Create WKB encoded points (simple binary data for testing)
+        let point1_wkb: Vec<u8> = vec![
+            1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64,
+        ];
+        let point2_wkb: Vec<u8> = vec![
+            1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 16, 64,
+        ];
+        let point3_wkb: Vec<u8> = vec![
+            1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64,
+        ];
+
+        let sedona_type = WKB_GEOMETRY;
+        let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![
+            Some(point1_wkb.as_slice()),
+            Some(point2_wkb.as_slice()),
+            Some(point3_wkb.as_slice()),
+        ]));
+
+        Ok((geom_array, sedona_type))
+    }
+
+    fn create_test_evaluated_batch(start_id: i32) -> Result<EvaluatedBatch> {
+        let batch = create_test_record_batch(start_id)?;
+        let (geom_array, sedona_type) = create_test_geometry_array()?;
+        let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
+
+        // Add distance as a scalar value
+        geom_array.distance = 
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+
+        Ok(EvaluatedBatch { batch, geom_array })
+    }
+
+    async fn create_spill_file_with_batches(num_batches: usize) -> 
Result<RefCountedTempFile> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_external_stream",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        for i in 0..num_batches {
+            let batch = create_test_evaluated_batch((i * 3) as i32)?;
+            writer.append(&batch)?;
+        }
+
+        writer.finish()
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_creation() -> Result<()> {
+        let spill_file = create_spill_file_with_batches(1).await?;
+        let stream = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+        assert!(stream.is_external());
+        assert_eq!(stream.schema(), create_test_schema());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_single_batch() -> Result<()> {
+        let spill_file = create_spill_file_with_batches(1).await?;
+        let mut stream = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+        let batch = stream.next().await.unwrap()?;
+        assert_eq!(batch.num_rows(), 3);
+
+        // Polling again should still return None
+        assert!(stream.next().await.is_none());
+        assert!(stream.next().await.is_none());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_large_number_of_batches() -> Result<()> {
+        let num_batches = 100;
+        let spill_file = create_spill_file_with_batches(num_batches).await?;
+        let mut stream = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+        let mut count = 0;
+        while let Some(batch_result) = stream.next().await {
+            let batch = batch_result?;
+            assert_eq!(batch.num_rows(), 3);
+            count += 1;
+        }
+
+        assert_eq!(count, num_batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_is_external_flag() -> Result<()> {
+        let spill_file = create_spill_file_with_batches(1).await?;
+        let stream = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+        // Verify the is_external flag returns true
+        assert!(stream.is_external());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_concurrent_access() -> Result<()> {
+        // Create multiple streams reading from different files
+        let file1 = create_spill_file_with_batches(2).await?;
+        let file2 = create_spill_file_with_batches(3).await?;
+
+        let mut stream1 = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(file1))?;
+        let mut stream2 = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(file2))?;
+
+        // Read from both streams
+        let batch1_1 = stream1.next().await.unwrap()?;
+        let batch2_1 = stream2.next().await.unwrap()?;
+
+        assert_eq!(batch1_1.num_rows(), 3);
+        assert_eq!(batch2_1.num_rows(), 3);
+
+        // Continue reading
+        let batch1_2 = stream1.next().await.unwrap()?;
+        let batch2_2 = stream2.next().await.unwrap()?;
+
+        assert_eq!(batch1_2.num_rows(), 3);
+        assert_eq!(batch2_2.num_rows(), 3);
+
+        // Stream1 should be done, stream2 should have one more
+        assert!(stream1.next().await.is_none());
+        let batch2_3 = stream2.next().await.unwrap()?;
+        assert_eq!(batch2_3.num_rows(), 3);
+        assert!(stream2.next().await.is_none());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_multiple_spill_files() -> Result<()> {
+        let file1 = create_spill_file_with_batches(2).await?;
+        let file2 = create_spill_file_with_batches(3).await?;
+        let schema = create_test_schema();
+        let mut stream = ExternalEvaluatedBatchStream::try_from_spill_files(
+            Arc::clone(&schema),
+            vec![Arc::new(file1), Arc::new(file2)],
+        )?;
+
+        assert_eq!(stream.schema(), schema);
+
+        let mut batches_read = 0;
+        while let Some(batch_result) = stream.next().await {
+            let batch = batch_result?;
+            assert_eq!(batch.num_rows(), 3);
+            batches_read += 1;
+        }
+
+        assert_eq!(batches_read, 5);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_empty_file() -> Result<()> {
+        let spill_file = create_spill_file_with_batches(0).await?;
+        let mut stream = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+        // Should immediately return None
+        let batch = stream.next().await;
+        assert!(batch.is_none());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_empty_spill_file_list() -> Result<()> {
+        let schema = create_test_schema();
+        let stream = ExternalEvaluatedBatchStream::try_from_spill_files(
+            Arc::clone(&schema),
+            Vec::<Arc<RefCountedTempFile>>::new(),
+        )?;
+
+        assert_eq!(stream.schema(), schema);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_preserves_data() -> Result<()> {
+        let spill_file = create_spill_file_with_batches(1).await?;
+        let mut stream = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+        let batch = stream.next().await.unwrap()?;
+
+        // Verify data preservation
+        let id_array = batch
+            .batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_array.value(0), 0);
+        assert_eq!(id_array.value(1), 1);
+        assert_eq!(id_array.value(2), 2);
+
+        let name_array = batch
+            .batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(name_array.value(0), "Alice");
+        assert_eq!(name_array.value(1), "Bob");
+        assert!(name_array.is_null(2));
+
+        // Verify geometry array
+        assert_eq!(batch.geom_array.rects.len(), 3);
+
+        // Verify distance
+        match &batch.geom_array.distance {
+            Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => {
+                assert_eq!(*val, 10.0);
+            }
+            _ => panic!("Expected scalar distance value"),
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_multiple_batches() -> Result<()> {
+        let num_batches = 5;
+        let spill_file = create_spill_file_with_batches(num_batches).await?;
+        let mut stream = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+        let mut batches_read = 0;
+        while let Some(batch_result) = stream.next().await {
+            let batch = batch_result?;
+            assert_eq!(batch.num_rows(), 3);
+
+            // Verify the ID starts at the expected value
+            let id_array = batch
+                .batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap();
+            let expected_start_id = (batches_read * 3) as i32;
+            assert_eq!(id_array.value(0), expected_start_id);
+
+            batches_read += 1;
+        }
+
+        assert_eq!(batches_read, num_batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_external_stream_poll_after_completion() -> Result<()> {
+        let spill_file = create_spill_file_with_batches(1).await?;
+        let mut stream = 
ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?;
+
+        // Read the batch
+        let _ = stream.next().await.unwrap()?;
+
+        // Should return None
+        assert!(stream.next().await.is_none());
+
+        // Polling again should still return None
+        assert!(stream.next().await.is_none());
+        assert!(stream.next().await.is_none());
+
+        Ok(())
+    }
+}
diff --git a/rust/sedona-spatial-join/src/evaluated_batch/spill.rs 
b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
new file mode 100644
index 00000000..9d5dd8a8
--- /dev/null
+++ b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
@@ -0,0 +1,806 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::Float64Array;
+use arrow_array::{Array, RecordBatch, StructArray};
+use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::SpillMetrics;
+use sedona_common::sedona_internal_err;
+use sedona_schema::datatypes::SedonaType;
+
+use crate::{
+    evaluated_batch::EvaluatedBatch,
+    operand_evaluator::EvaluatedGeometryArray,
+    utils::spill::{RecordBatchSpillReader, RecordBatchSpillWriter},
+};
+
+/// Writer for spilling evaluated batches to disk
+pub struct EvaluatedBatchSpillWriter {
+    /// The temporary spill file being written to
+    inner: RecordBatchSpillWriter,
+
+    /// Schema of the spilled record batches. It is augmented from the schema 
of original record batches
+    /// The spill_schema has 4 fields:
+    /// * `data`: StructArray containing the original record batch columns
+    /// * `geom`: geometry array in storage format
+    /// * `dist`: distance field
+    spill_schema: Schema,
+    /// Inner fields of the "data" StructArray in the spilled record batches
+    data_inner_fields: Fields,
+}
+
+const SPILL_FIELD_DATA_INDEX: usize = 0;
+const SPILL_FIELD_GEOM_INDEX: usize = 1;
+const SPILL_FIELD_DIST_INDEX: usize = 2;
+
+impl EvaluatedBatchSpillWriter {
+    /// Create a new SpillWriter
+    pub fn try_new(
+        env: Arc<RuntimeEnv>,
+        schema: SchemaRef,
+        sedona_type: &SedonaType,
+        request_description: &str,
+        compression: SpillCompression,
+        metrics: SpillMetrics,
+        batch_size_threshold: Option<usize>,
+    ) -> Result<Self> {
+        // Construct schema of record batches to be written. The written 
batches are augmented from the original record batches.
+        let data_inner_fields = schema.fields().clone();
+        let data_struct_field =
+            Field::new("data", DataType::Struct(data_inner_fields.clone()), 
false);
+        let geom_field = sedona_type.to_storage_field("geom", true)?;
+        let dist_field = Field::new("dist", DataType::Float64, true);
+        let spill_schema = Schema::new(vec![data_struct_field, geom_field, 
dist_field]);
+
+        // Create spill file
+        let inner = RecordBatchSpillWriter::try_new(
+            env,
+            Arc::new(spill_schema.clone()),
+            request_description,
+            compression,
+            metrics,
+            batch_size_threshold,
+        )?;
+
+        Ok(Self {
+            inner,
+            spill_schema,
+            data_inner_fields,
+        })
+    }
+
+    /// Append an EvaluatedBatch to the spill file
+    pub fn append(&mut self, evaluated_batch: &EvaluatedBatch) -> Result<()> {
+        let record_batch = self.spilled_record_batch(evaluated_batch)?;
+
+        // Splitting/compaction and spill bytes/rows metrics are handled by 
`RecordBatchSpillWriter`.
+        self.inner.write_batch(record_batch)?;
+        Ok(())
+    }
+
+    /// Finish writing and return the temporary file
+    pub fn finish(self) -> Result<RefCountedTempFile> {
+        self.inner.finish()
+    }
+
+    fn spilled_record_batch(&self, evaluated_batch: &EvaluatedBatch) -> 
Result<RecordBatch> {
+        let num_rows = evaluated_batch.num_rows();
+
+        // Store the original data batch into a StructArray
+        let data_batch = &evaluated_batch.batch;
+        let data_arrays = data_batch.columns().to_vec();
+        let data_struct_array =
+            StructArray::try_new(self.data_inner_fields.clone(), data_arrays, 
None)?;
+
+        // Store dist into a Float64Array
+        let mut dist_builder = 
arrow::array::Float64Builder::with_capacity(num_rows);
+        let geom_array = &evaluated_batch.geom_array;
+        match &geom_array.distance {
+            Some(ColumnarValue::Scalar(scalar)) => match scalar {
+                ScalarValue::Float64(dist_value) => {
+                    for _ in 0..num_rows {
+                        dist_builder.append_option(*dist_value);
+                    }
+                }
+                _ => {
+                    return sedona_internal_err!("Distance columnar value is 
not a Float64Array");
+                }
+            },
+            Some(ColumnarValue::Array(array)) => {
+                let float_array = array
+                    .as_any()
+                    .downcast_ref::<arrow::array::Float64Array>()
+                    .unwrap();
+                dist_builder.append_array(float_array);
+            }
+            None => {
+                for _ in 0..num_rows {
+                    dist_builder.append_null();
+                }
+            }
+        }
+        let dist_array = dist_builder.finish();
+
+        // Assemble the final spilled RecordBatch
+        let columns = vec![
+            Arc::new(data_struct_array) as Arc<dyn arrow::array::Array>,
+            Arc::clone(&geom_array.geometry_array),
+            Arc::new(dist_array) as Arc<dyn arrow::array::Array>,
+        ];
+        let spilled_record_batch =
+            RecordBatch::try_new(Arc::new(self.spill_schema.clone()), 
columns)?;
+        Ok(spilled_record_batch)
+    }
+}
+/// Reader for reading spilled evaluated batches from disk
+pub struct EvaluatedBatchSpillReader {
+    inner: RecordBatchSpillReader,
+}
+impl EvaluatedBatchSpillReader {
+    /// Create a new SpillReader
+    pub fn try_new(temp_file: &RefCountedTempFile) -> Result<Self> {
+        Ok(Self {
+            inner: RecordBatchSpillReader::try_new(temp_file)?,
+        })
+    }
+
+    /// Get the schema of the spilled data
+    pub fn schema(&self) -> SchemaRef {
+        self.inner.schema()
+    }
+
+    /// Read the next EvaluatedBatch from the spill file
+    #[allow(unused)]
+    pub fn next_batch(&mut self) -> Option<Result<EvaluatedBatch>> {
+        self.next_raw_batch()
+            .map(|record_batch| 
record_batch.and_then(spilled_batch_to_evaluated_batch))
+    }
+
+    /// Read the next raw RecordBatch from the spill file
+    pub fn next_raw_batch(&mut self) -> Option<Result<RecordBatch>> {
+        self.inner.next_batch()
+    }
+}
+
+pub(crate) fn spilled_batch_to_evaluated_batch(
+    record_batch: RecordBatch,
+) -> Result<EvaluatedBatch> {
+    // Extract the data struct array (column 0) and convert back to the 
original RecordBatch
+    let data_array = record_batch
+        .column(SPILL_FIELD_DATA_INDEX)
+        .as_any()
+        .downcast_ref::<StructArray>()
+        .ok_or_else(|| {
+            DataFusionError::Internal("Expected data column to be a 
StructArray".to_string())
+        })?;
+
+    let data_schema = Arc::new(Schema::new(match data_array.data_type() {
+        DataType::Struct(fields) => fields.clone(),
+        _ => {
+            return Err(DataFusionError::Internal(
+                "Expected data column to have Struct data type".to_string(),
+            ))
+        }
+    }));
+
+    let data_columns = (0..data_array.num_columns())
+        .map(|i| Arc::clone(data_array.column(i)))
+        .collect::<Vec<_>>();
+
+    let batch = RecordBatch::try_new(data_schema, data_columns)?;
+
+    // Extract the geometry array (column 1)
+    let geom_array = Arc::clone(record_batch.column(SPILL_FIELD_GEOM_INDEX));
+
+    // Determine the SedonaType from the geometry field in the record batch 
schema
+    let schema = record_batch.schema();
+    let geom_field = schema.field(SPILL_FIELD_GEOM_INDEX);
+    let sedona_type = SedonaType::from_storage_field(geom_field)?;
+
+    // Extract the distance array (column 3) and convert back to ColumnarValue
+    let dist_array = record_batch
+        .column(SPILL_FIELD_DIST_INDEX)
+        .as_any()
+        .downcast_ref::<Float64Array>()
+        .ok_or_else(|| {
+            DataFusionError::Internal("Expected dist column to be 
Float64Array".to_string())
+        })?;
+
+    let distance = if !dist_array.is_empty() {
+        // Check if all values are the same (scalar case)
+        let first_value = if dist_array.is_null(0) {
+            None
+        } else {
+            Some(dist_array.value(0))
+        };
+
+        let all_same = (1..dist_array.len()).all(|i| {
+            let current_value = if dist_array.is_null(i) {
+                None
+            } else {
+                Some(dist_array.value(i))
+            };
+            current_value == first_value
+        });
+
+        if all_same {
+            Some(ColumnarValue::Scalar(ScalarValue::Float64(first_value)))
+        } else {
+            Some(ColumnarValue::Array(Arc::clone(
+                record_batch.column(SPILL_FIELD_DIST_INDEX),
+            )))
+        }
+    } else {
+        None
+    };
+
+    // Create EvaluatedGeometryArray
+    let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
+    geom_array.distance = distance;
+
+    Ok(EvaluatedBatch { batch, geom_array })
+}
+
+pub(crate) fn spilled_schema_to_evaluated_schema(spilled_schema: &SchemaRef) 
-> Result<SchemaRef> {
+    if spilled_schema.fields().is_empty() {
+        return Ok(SchemaRef::new(Schema::empty()));
+    }
+
+    let data_field = spilled_schema.field(SPILL_FIELD_DATA_INDEX);
+    let inner_fields = match data_field.data_type() {
+        DataType::Struct(fields) => fields.clone(),
+        _ => {
+            return sedona_internal_err!("Invalid schema of spilled file: 
{:?}", spilled_schema);
+        }
+    };
+    Ok(SchemaRef::new(Schema::new(inner_fields)))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::utils::arrow_utils::get_record_batch_memory_size;
+    use arrow_array::{ArrayRef, BinaryArray, Int32Array, StringArray};
+    use arrow_schema::{DataType, Field, Schema};
+    use datafusion_common::Result;
+    use datafusion_execution::runtime_env::RuntimeEnv;
+    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+    use sedona_schema::datatypes::WKB_GEOMETRY;
+    use std::sync::Arc;
+
+    fn create_test_runtime_env() -> Result<Arc<RuntimeEnv>> {
+        Ok(Arc::new(RuntimeEnv::default()))
+    }
+
+    fn create_test_schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]))
+    }
+
+    fn create_test_record_batch() -> Result<RecordBatch> {
+        let schema = create_test_schema();
+        let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
+        let name_array = Arc::new(StringArray::from(vec![Some("Alice"), 
Some("Bob"), None]));
+        RecordBatch::try_new(schema, vec![id_array, name_array]).map_err(|e| 
e.into())
+    }
+
+    fn create_test_geometry_array() -> Result<(ArrayRef, SedonaType)> {
+        // Create WKB encoded points (simple binary data for testing)
+        // WKB for POINT (1 2): 01 01000000 0000000000000000F03F 
0000000000000040
+        let point1_wkb: Vec<u8> = vec![
+            1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64,
+        ];
+        let point2_wkb: Vec<u8> = vec![
+            1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 16, 64,
+        ];
+        let point3_wkb: Vec<u8> = vec![
+            1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64,
+        ];
+
+        let sedona_type = WKB_GEOMETRY;
+        let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![
+            Some(point1_wkb.as_slice()),
+            Some(point2_wkb.as_slice()),
+            Some(point3_wkb.as_slice()),
+        ]));
+
+        Ok((geom_array, sedona_type))
+    }
+
+    fn create_test_evaluated_batch() -> Result<EvaluatedBatch> {
+        let batch = create_test_record_batch()?;
+        let (geom_array, sedona_type) = create_test_geometry_array()?;
+        let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
+
+        // Add distance as a scalar value
+        geom_array.distance = 
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+
+        Ok(EvaluatedBatch { batch, geom_array })
+    }
+
+    fn create_test_evaluated_batch_with_array_distance() -> 
Result<EvaluatedBatch> {
+        let batch = create_test_record_batch()?;
+        let (geom_array, sedona_type) = create_test_geometry_array()?;
+        let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
+
+        // Add distance as an array value
+        let dist_array = Arc::new(Float64Array::from(vec![Some(1.0), 
Some(2.0), Some(3.0)]));
+        geom_array.distance = Some(ColumnarValue::Array(dist_array));
+
+        Ok(EvaluatedBatch { batch, geom_array })
+    }
+
+    fn create_test_evaluated_batch_with_nulls() -> Result<EvaluatedBatch> {
+        let schema = create_test_schema();
+        let id_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
+        let name_array = Arc::new(StringArray::from(vec![
+            Some("Alice"),
+            None,
+            Some("Charlie"),
+        ]));
+        let batch = RecordBatch::try_new(schema, vec![id_array, name_array])
+            .map_err(DataFusionError::from)?;
+
+        // Create geometry array with null in the middle
+        let point1_wkb: Vec<u8> = vec![
+            1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64,
+        ];
+        let point3_wkb: Vec<u8> = vec![
+            1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64,
+        ];
+
+        let sedona_type = WKB_GEOMETRY;
+        let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![
+            Some(point1_wkb.as_slice()),
+            None,
+            Some(point3_wkb.as_slice()),
+        ]));
+
+        let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
+
+        // Add distance with nulls
+        let dist_array = Arc::new(Float64Array::from(vec![Some(1.0), None, 
Some(3.0)]));
+        geom_array.distance = Some(ColumnarValue::Array(dist_array));
+
+        Ok(EvaluatedBatch { batch, geom_array })
+    }
+
+    #[test]
+    fn test_spill_writer_creation() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        // Verify the spill schema has the expected structure
+        assert_eq!(writer.spill_schema.fields().len(), 3);
+        assert_eq!(
+            writer.spill_schema.field(SPILL_FIELD_DATA_INDEX).name(),
+            "data"
+        );
+        assert_eq!(
+            writer.spill_schema.field(SPILL_FIELD_GEOM_INDEX).name(),
+            "geom"
+        );
+        assert_eq!(
+            writer.spill_schema.field(SPILL_FIELD_DIST_INDEX).name(),
+            "dist"
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_write_and_read_basic() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        let evaluated_batch = create_test_evaluated_batch()?;
+        let original_num_rows = evaluated_batch.num_rows();
+
+        writer.append(&evaluated_batch)?;
+        let temp_file = writer.finish()?;
+
+        // Read back the spilled data
+        let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+        let read_batch_result = reader.next_batch();
+
+        assert!(read_batch_result.is_some());
+        let read_batch = read_batch_result.unwrap()?;
+
+        // Verify the data
+        assert_eq!(read_batch.num_rows(), original_num_rows);
+        assert_eq!(read_batch.batch.num_columns(), 2); // id and name columns
+
+        // Verify that there are no more batches
+        assert!(reader.next_batch().is_none());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_write_and_read_with_array_distance() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        let evaluated_batch = 
create_test_evaluated_batch_with_array_distance()?;
+        writer.append(&evaluated_batch)?;
+        let temp_file = writer.finish()?;
+
+        // Read back the spilled data
+        let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+        let read_batch = reader.next_batch().unwrap()?;
+
+        // Verify distance is read back as array
+        match &read_batch.geom_array.distance {
+            Some(ColumnarValue::Array(array)) => {
+                let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
+                assert_eq!(float_array.len(), 3);
+                assert_eq!(float_array.value(0), 1.0);
+                assert_eq!(float_array.value(1), 2.0);
+                assert_eq!(float_array.value(2), 3.0);
+            }
+            _ => panic!("Expected distance to be an array"),
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_write_and_read_with_nulls() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        let evaluated_batch = create_test_evaluated_batch_with_nulls()?;
+        writer.append(&evaluated_batch)?;
+        let temp_file = writer.finish()?;
+
+        // Read back the spilled data
+        let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+        let read_batch = reader.next_batch().unwrap()?;
+
+        // Verify nulls are preserved
+        assert_eq!(read_batch.num_rows(), 3);
+        assert!(read_batch.geom_array.rects[1].is_none()); // Null geometry
+
+        // Verify distance nulls
+        match &read_batch.geom_array.distance {
+            Some(ColumnarValue::Array(array)) => {
+                let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
+                assert!(float_array.is_valid(0));
+                assert!(float_array.is_null(1));
+                assert!(float_array.is_valid(2));
+            }
+            _ => panic!("Expected distance to be an array"),
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_multiple_batches() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        // Write multiple batches
+        let batch1 = create_test_evaluated_batch()?;
+        let batch2 = create_test_evaluated_batch_with_array_distance()?;
+        let batch3 = create_test_evaluated_batch_with_nulls()?;
+
+        writer.append(&batch1)?;
+        writer.append(&batch2)?;
+        writer.append(&batch3)?;
+        let temp_file = writer.finish()?;
+
+        // Read back all batches
+        let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+
+        let read_batch1 = reader.next_batch().unwrap()?;
+        assert_eq!(read_batch1.num_rows(), 3);
+
+        let read_batch2 = reader.next_batch().unwrap()?;
+        assert_eq!(read_batch2.num_rows(), 3);
+
+        let read_batch3 = reader.next_batch().unwrap()?;
+        assert_eq!(read_batch3.num_rows(), 3);
+
+        // Verify no more batches
+        assert!(reader.next_batch().is_none());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_metrics_updated() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics.clone(),
+            None,
+        )?;
+
+        let evaluated_batch = create_test_evaluated_batch()?;
+        writer.append(&evaluated_batch)?;
+        writer.finish()?;
+
+        // Verify spill metrics were updated
+        assert!(metrics.spilled_rows.value() > 0);
+        assert!(metrics.spilled_bytes.value() > 0);
+
+        // Verify spill file count was updated
+        assert_eq!(metrics.spill_file_count.value(), 1);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_rect_preservation() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        let evaluated_batch = create_test_evaluated_batch()?;
+        let original_rects = evaluated_batch.rects().clone();
+
+        writer.append(&evaluated_batch)?;
+        let temp_file = writer.finish()?;
+
+        // Read back and verify rects
+        let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+        let read_batch = reader.next_batch().unwrap()?;
+
+        assert_eq!(read_batch.rects().len(), original_rects.len());
+        for (original, read) in 
original_rects.iter().zip(read_batch.rects().iter()) {
+            match (original, read) {
+                (Some(orig_rect), Some(read_rect)) => {
+                    assert_eq!(orig_rect.min().x, read_rect.min().x);
+                    assert_eq!(orig_rect.min().y, read_rect.min().y);
+                    assert_eq!(orig_rect.max().x, read_rect.max().x);
+                    assert_eq!(orig_rect.max().y, read_rect.max().y);
+                }
+                (None, None) => {}
+                _ => panic!("Rect mismatch between original and read"),
+            }
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_scalar_distance_preserved() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        let evaluated_batch = create_test_evaluated_batch()?;
+        writer.append(&evaluated_batch)?;
+        let temp_file = writer.finish()?;
+
+        // Read back and verify scalar distance is preserved
+        let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+        let read_batch = reader.next_batch().unwrap()?;
+
+        match &read_batch.geom_array.distance {
+            Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => {
+                assert_eq!(*val, 10.0);
+            }
+            _ => panic!("Expected scalar distance value"),
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_empty_batch() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema.clone(),
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            None,
+        )?;
+
+        // Create an empty batch
+        let id_array = Arc::new(Int32Array::from(Vec::<i32>::new()));
+        let name_array = 
Arc::new(StringArray::from(Vec::<Option<&str>>::new()));
+        let empty_batch = RecordBatch::try_new(schema, vec![id_array, 
name_array])
+            .map_err(DataFusionError::from)?;
+
+        let geom_array: ArrayRef = 
Arc::new(BinaryArray::from(Vec::<Option<&[u8]>>::new()));
+        let geom_array = EvaluatedGeometryArray::try_new(geom_array, 
&sedona_type)?;
+
+        let evaluated_batch = EvaluatedBatch {
+            batch: empty_batch,
+            geom_array,
+        };
+
+        writer.append(&evaluated_batch)?;
+        let temp_file = writer.finish()?;
+
+        // Read back and verify
+        let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+        let read_batch = reader.next_batch().unwrap()?;
+        assert_eq!(read_batch.num_rows(), 0);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_spill_batch_splitting() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let schema = create_test_schema();
+        let sedona_type = WKB_GEOMETRY;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        // Create a batch
+        let evaluated_batch = create_test_evaluated_batch()?;
+        let batch_size = get_record_batch_memory_size(&evaluated_batch.batch)?;
+
+        // Set threshold to be smaller than batch size, so it splits into at 
least 2 parts
+        let threshold = batch_size / 2;
+
+        let mut writer = EvaluatedBatchSpillWriter::try_new(
+            env,
+            schema,
+            &sedona_type,
+            "test_spill",
+            SpillCompression::Uncompressed,
+            metrics,
+            Some(threshold),
+        )?;
+
+        writer.append(&evaluated_batch)?;
+        let temp_file = writer.finish()?;
+
+        // Read back the spilled data
+        let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
+
+        // We expect multiple batches
+        let mut num_batches = 0;
+        let mut total_rows = 0;
+        while let Some(batch_result) = reader.next_batch() {
+            let batch = batch_result?;
+            num_batches += 1;
+            total_rows += batch.num_rows();
+        }
+
+        assert!(
+            num_batches > 1,
+            "Batch should have been split into multiple batches"
+        );
+        assert_eq!(
+            total_rows,
+            evaluated_batch.num_rows(),
+            "Total rows should match"
+        );
+
+        Ok(())
+    }
+}
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs 
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index a1643834..9230aaf4 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -140,7 +140,7 @@ impl BuildSideBatchesCollector {
             self.collect_all_concurrently(streams, reservations, metrics_vec)
                 .await
         } else {
-            self.collect_all_sequential(streams, reservations, metrics_vec)
+            self.collect_all_sequentially(streams, reservations, metrics_vec)
                 .await
         }
     }
@@ -187,7 +187,7 @@ impl BuildSideBatchesCollector {
         Ok(partitions.into_iter().map(|v| v.unwrap()).collect())
     }
 
-    async fn collect_all_sequential(
+    async fn collect_all_sequentially(
         &self,
         streams: Vec<SendableRecordBatchStream>,
         reservations: Vec<MemoryReservation>,
diff --git a/rust/sedona-spatial-join/src/lib.rs 
b/rust/sedona-spatial-join/src/lib.rs
index 0c99b91b..94af3f22 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-spatial-join/src/lib.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 mod build_index;
-mod evaluated_batch;
+pub mod evaluated_batch;
 pub mod exec;
 mod index;
 pub mod operand_evaluator;
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs 
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index b76e9116..8b431396 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -90,7 +90,7 @@ pub(crate) fn create_operand_evaluator(
 }
 
 /// Result of evaluating a geometry batch.
-pub(crate) struct EvaluatedGeometryArray {
+pub struct EvaluatedGeometryArray {
     /// The array of geometries produced by evaluating the geometry expression.
     pub geometry_array: ArrayRef,
     /// The rects of the geometries in the geometry array. The length of this 
array is equal to the number of geometries.
diff --git a/rust/sedona-spatial-join/src/utils.rs 
b/rust/sedona-spatial-join/src/utils.rs
index 6db8f85a..3f53df47 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join/src/utils.rs
@@ -21,3 +21,4 @@ pub(crate) mod concurrent_reservation;
 pub(crate) mod init_once_array;
 pub(crate) mod join_utils;
 pub(crate) mod once_fut;
+pub(crate) mod spill;
diff --git a/rust/sedona-spatial-join/src/utils/arrow_utils.rs 
b/rust/sedona-spatial-join/src/utils/arrow_utils.rs
index 367568fc..258b1f8a 100644
--- a/rust/sedona-spatial-join/src/utils/arrow_utils.rs
+++ b/rust/sedona-spatial-join/src/utils/arrow_utils.rs
@@ -21,10 +21,21 @@ use arrow::array::{Array, ArrayData, BinaryViewArray, 
ListArray, RecordBatch, St
 use arrow_array::make_array;
 use arrow_array::ArrayRef;
 use arrow_array::StructArray;
+use arrow_schema::SchemaRef;
 use arrow_schema::{ArrowError, DataType};
 use datafusion_common::Result;
 use sedona_common::sedona_internal_err;
 
+/// Checks if the schema contains any view types (Utf8View or BinaryView). 
Batches
+/// with view types may need special handling (e.g. compaction) before spilling
+/// or holding in memory for extended periods.
+pub(crate) fn schema_contains_view_types(schema: &SchemaRef) -> bool {
+    schema
+        .flattened_fields()
+        .iter()
+        .any(|field| matches!(field.data_type(), DataType::Utf8View | 
DataType::BinaryView))
+}
+
 /// Reconstruct `batch` to organize the payload buffers of each 
`StringViewArray` and
 /// `BinaryViewArray` in sequential order by calling `gc()` on them.
 ///
@@ -246,9 +257,55 @@ mod tests {
     use arrow_array::{
         BinaryViewArray, BooleanArray, ListArray, StringArray, 
StringViewArray, StructArray,
     };
-    use arrow_schema::{DataType, Field, Schema};
+    use arrow_schema::{DataType, Field, Fields, Schema};
     use std::sync::Arc;
 
+    fn make_schema(fields: Vec<Field>) -> SchemaRef {
+        Arc::new(Schema::new(fields))
+    }
+
+    #[test]
+    fn test_schema_contains_view_types_top_level() {
+        let schema_ref = make_schema(vec![
+            Field::new("a", DataType::Utf8View, true),
+            Field::new("b", DataType::BinaryView, true),
+        ]);
+
+        assert!(schema_contains_view_types(&schema_ref));
+
+        // Similar shape but without view types
+        let schema_no_view = make_schema(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Binary, true),
+        ]);
+        assert!(!schema_contains_view_types(&schema_no_view));
+    }
+
+    #[test]
+    fn test_schema_contains_view_types_nested() {
+        let nested = Field::new(
+            "s",
+            DataType::Struct(Fields::from(vec![Field::new(
+                "v",
+                DataType::Utf8View,
+                true,
+            )])),
+            true,
+        );
+
+        let schema_ref = make_schema(vec![nested]);
+        assert!(schema_contains_view_types(&schema_ref));
+
+        // Nested struct without any view types
+        let nested_no_view = Field::new(
+            "s",
+            DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8, 
true)])),
+            true,
+        );
+        let schema_no_view = make_schema(vec![nested_no_view]);
+        assert!(!schema_contains_view_types(&schema_no_view));
+    }
+
     #[test]
     fn test_string_view_array_memory_size() {
         let array = StringViewArray::from(vec![
@@ -523,12 +580,12 @@ mod tests {
 
         let array: ArrayRef = Arc::new(struct_array);
         let slice = array.slice(0, 2);
-        let before_size = get_array_memory_size(&array).unwrap();
+        let before_size = slice.get_array_memory_size();
 
         let (compacted, mutated) = compact_array(Arc::new(slice)).unwrap();
         assert!(mutated);
 
-        let after_size = get_array_memory_size(&compacted).unwrap();
+        let after_size = compacted.get_array_memory_size();
         assert!(after_size < before_size);
     }
 
@@ -548,12 +605,12 @@ mod tests {
         }
         let bv_list: ListArray = bv_list_builder.finish();
         let sliced: ArrayRef = Arc::new(bv_list.slice(0, 1));
-        let before_size = get_array_memory_size(&sliced).unwrap();
+        let before_size = sliced.get_array_memory_size();
 
         let (compacted, mutated) = compact_array(Arc::clone(&sliced)).unwrap();
         assert!(mutated);
 
-        let after_size = get_array_memory_size(&compacted).unwrap();
+        let after_size = compacted.get_array_memory_size();
         assert!(after_size <= before_size);
     }
 
diff --git a/rust/sedona-spatial-join/src/utils/spill.rs 
b/rust/sedona-spatial-join/src/utils/spill.rs
new file mode 100644
index 00000000..fef585a5
--- /dev/null
+++ b/rust/sedona-spatial-join/src/utils/spill.rs
@@ -0,0 +1,382 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fs::File, io::BufReader, sync::Arc};
+
+use arrow::ipc::{
+    reader::StreamReader,
+    writer::{IpcWriteOptions, StreamWriter},
+};
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
+use datafusion_physical_plan::metrics::SpillMetrics;
+
+use crate::utils::arrow_utils::{
+    compact_batch, get_record_batch_memory_size, schema_contains_view_types,
+};
+
+/// Generic Arrow IPC stream spill writer for [`RecordBatch`].
+///
+/// Shared between multiple components so spill metrics are updated 
consistently.
+pub(crate) struct RecordBatchSpillWriter {
+    in_progress_file: RefCountedTempFile,
+    writer: StreamWriter<File>,
+    metrics: SpillMetrics,
+    batch_size_threshold: Option<usize>,
+    gc_view_arrays: bool,
+}
+
+impl RecordBatchSpillWriter {
+    pub fn try_new(
+        env: Arc<RuntimeEnv>,
+        schema: SchemaRef,
+        request_description: &str,
+        compression: SpillCompression,
+        metrics: SpillMetrics,
+        batch_size_threshold: Option<usize>,
+    ) -> Result<Self> {
+        let in_progress_file = 
env.disk_manager.create_tmp_file(request_description)?;
+        let file = File::create(in_progress_file.path())?;
+
+        let mut write_options = IpcWriteOptions::default();
+        write_options = 
write_options.try_with_compression(compression.into())?;
+
+        let writer = StreamWriter::try_new_with_options(file, schema.as_ref(), 
write_options)?;
+        metrics.spill_file_count.add(1);
+
+        let gc_view_arrays = schema_contains_view_types(&schema);
+
+        Ok(Self {
+            in_progress_file,
+            writer,
+            metrics,
+            batch_size_threshold,
+            gc_view_arrays,
+        })
+    }
+
+    /// Write a record batch to the spill file.
+    ///
+    /// If `batch_size_threshold` is configured and the in-memory size of the 
batch exceeds the
+    /// threshold, this will automatically split the batch into smaller slices 
and (optionally)
+    /// compact each slice before writing.
+    pub fn write_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        let num_rows = batch.num_rows();
+        if num_rows == 0 {
+            // Preserve "empty batch" semantics: callers may rely on spilling 
and reading back a
+            // zero-row batch (e.g. as a sentinel for an empty stream).
+            return self.write_one_batch(batch);
+        }
+
+        let rows_per_split = self.calculate_rows_per_split(&batch, num_rows)?;
+        if rows_per_split < num_rows {
+            let mut offset = 0;
+            while offset < num_rows {
+                let length = std::cmp::min(rows_per_split, num_rows - offset);
+                let slice = batch.slice(offset, length);
+                self.write_one_batch(slice)?;
+                offset += length;
+            }
+        } else {
+            self.write_one_batch(batch)?;
+        }
+        Ok(())
+    }
+
+    fn calculate_rows_per_split(&self, batch: &RecordBatch, num_rows: usize) 
-> Result<usize> {
+        let Some(threshold) = self.batch_size_threshold else {
+            return Ok(num_rows);
+        };
+        if threshold == 0 {
+            return Ok(num_rows);
+        }
+
+        let batch_size = get_record_batch_memory_size(batch)?;
+        if batch_size <= threshold {
+            return Ok(num_rows);
+        }
+
+        let num_splits = batch_size.div_ceil(threshold);
+        let rows = num_rows.div_ceil(num_splits);
+        Ok(std::cmp::max(1, rows))
+    }
+
+    fn write_one_batch(&mut self, batch: RecordBatch) -> Result<()> {
+        // Writing record batches containing sparse binary view arrays may 
lead to excessive
+        // disk usage and slow read performance later. Compact such batches 
before writing.
+        let batch = if self.gc_view_arrays {
+            compact_batch(batch)?
+        } else {
+            batch
+        };
+        self.writer.write(&batch).map_err(|e| {
+            DataFusionError::Execution(format!(
+                "Failed to write RecordBatch to spill file {:?}: {}",
+                self.in_progress_file.path(),
+                e
+            ))
+        })?;
+
+        self.metrics.spilled_rows.add(batch.num_rows());
+        Ok(())
+    }
+
+    pub fn finish(mut self) -> Result<RefCountedTempFile> {
+        self.writer.finish()?;
+
+        let mut in_progress_file = self.in_progress_file;
+        in_progress_file.update_disk_usage()?;
+        let size = in_progress_file.current_disk_usage();
+        self.metrics.spilled_bytes.add(size as usize);
+        Ok(in_progress_file)
+    }
+}
+
+/// Generic Arrow IPC stream spill reader for [`RecordBatch`].
+pub(crate) struct RecordBatchSpillReader {
+    stream_reader: StreamReader<BufReader<File>>,
+}
+
+impl RecordBatchSpillReader {
+    pub fn try_new(temp_file: &RefCountedTempFile) -> Result<Self> {
+        let file = File::open(temp_file.path())?;
+        let mut stream_reader = StreamReader::try_new_buffered(file, None)?;
+
+        // SAFETY: spill writers in this crate strictly follow Arrow IPC 
specifications.
+        // Skip redundant validation during read to speed up.
+        unsafe {
+            stream_reader = stream_reader.with_skip_validation(true);
+        }
+
+        Ok(Self { stream_reader })
+    }
+
+    pub fn schema(&self) -> SchemaRef {
+        self.stream_reader.schema()
+    }
+
+    pub fn next_batch(&mut self) -> Option<Result<RecordBatch>> {
+        self.stream_reader
+            .next()
+            .map(|result| result.map_err(|e| e.into()))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow_array::builder::BinaryViewBuilder;
+    use arrow_array::{Int32Array, StringArray};
+    use arrow_schema::{DataType, Field, Schema};
+    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+
+    fn create_test_runtime_env() -> Result<Arc<RuntimeEnv>> {
+        Ok(Arc::new(RuntimeEnv::default()))
+    }
+
+    fn create_test_schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ]))
+    }
+
+    fn create_test_record_batch(num_rows: usize) -> RecordBatch {
+        let ids: Int32Array = (0..num_rows as i32).collect();
+
+        let names: StringArray = (0..num_rows)
+            .map(|i| {
+                if i % 3 == 0 {
+                    None
+                } else {
+                    Some(format!("name_{i}"))
+                }
+            })
+            .collect();
+
+        RecordBatch::try_new(create_test_schema(), vec![Arc::new(ids), 
Arc::new(names)]).unwrap()
+    }
+
+    fn create_test_binary_view_batch(num_rows: usize, value_len: usize) -> 
RecordBatch {
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "payload",
+            DataType::BinaryView,
+            false,
+        )]));
+
+        let mut builder = BinaryViewBuilder::new();
+        for i in 0..num_rows {
+            let byte = b'a' + (i % 26) as u8;
+            let bytes = vec![byte; value_len];
+            builder.append_value(bytes.as_slice());
+        }
+
+        let array = Arc::new(builder.finish());
+        RecordBatch::try_new(schema, vec![array]).unwrap()
+    }
+
+    #[test]
+    fn test_record_batch_spill_empty_batch_round_trip() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let schema = create_test_schema();
+        let mut writer = RecordBatchSpillWriter::try_new(
+            env,
+            schema.clone(),
+            "test_record_batch_spill_empty",
+            SpillCompression::Uncompressed,
+            metrics.clone(),
+            None,
+        )?;
+
+        let empty = create_test_record_batch(0);
+        writer.write_batch(empty)?;
+        let file = writer.finish()?;
+
+        assert_eq!(metrics.spill_file_count.value(), 1);
+        assert_eq!(metrics.spilled_rows.value(), 0);
+
+        let mut reader = RecordBatchSpillReader::try_new(&file)?;
+        let read = reader.next_batch().unwrap()?;
+        assert_eq!(read.num_rows(), 0);
+        assert_eq!(read.schema(), schema);
+        assert!(reader.next_batch().is_none());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_record_batch_spill_round_trip() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let schema = create_test_schema();
+        let mut writer = RecordBatchSpillWriter::try_new(
+            env,
+            schema.clone(),
+            "test_record_batch_spill",
+            SpillCompression::Uncompressed,
+            metrics.clone(),
+            None,
+        )?;
+
+        let batch1 = create_test_record_batch(5);
+        let batch2 = create_test_record_batch(3);
+        writer.write_batch(batch1)?;
+        writer.write_batch(batch2)?;
+
+        let file = writer.finish()?;
+
+        assert_eq!(metrics.spill_file_count.value(), 1);
+        assert_eq!(metrics.spilled_rows.value(), 8);
+        assert!(metrics.spilled_bytes.value() > 0);
+
+        let mut reader = RecordBatchSpillReader::try_new(&file)?;
+        assert_eq!(reader.schema(), schema);
+
+        let read1 = reader.next_batch().unwrap()?;
+        assert_eq!(read1.num_rows(), 5);
+        let read2 = reader.next_batch().unwrap()?;
+        assert_eq!(read2.num_rows(), 3);
+        assert!(reader.next_batch().is_none());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_record_batch_spill_auto_splitting() -> Result<()> {
+        let env = create_test_runtime_env()?;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        let schema = create_test_schema();
+        // Force splitting by setting a tiny threshold.
+        let mut writer = RecordBatchSpillWriter::try_new(
+            env,
+            schema.clone(),
+            "test_record_batch_spill_split",
+            SpillCompression::Uncompressed,
+            metrics.clone(),
+            Some(1),
+        )?;
+
+        let batch = create_test_record_batch(10);
+        writer.write_batch(batch)?;
+        let file = writer.finish()?;
+
+        // Rows should reflect the logical input rows, even if internally 
split.
+        assert_eq!(metrics.spilled_rows.value(), 10);
+        assert!(metrics.spilled_bytes.value() > 0);
+
+        // Reader should be able to read all rows back across multiple batches.
+        let mut reader = RecordBatchSpillReader::try_new(&file)?;
+        let mut total_rows = 0;
+        while let Some(batch) = reader.next_batch() {
+            total_rows += batch?.num_rows();
+        }
+        assert_eq!(total_rows, 10);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_record_batch_spill_sliced_binary_view_not_excessive() -> 
Result<()> {
+        let env = create_test_runtime_env()?;
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = SpillMetrics::new(&metrics_set, 0);
+
+        // Use a long payload so the view-value buffers dominate overhead, 
making the
+        // size comparison stable across platforms.
+        const NUM_ROWS: usize = 100;
+        const NUM_SLICES: usize = 10;
+        const VALUE_LEN: usize = 8 * 1024;
+
+        let batch = create_test_binary_view_batch(NUM_ROWS, VALUE_LEN);
+        let batch_size = get_record_batch_memory_size(&batch)?;
+
+        let mut writer = RecordBatchSpillWriter::try_new(
+            env,
+            batch.schema(),
+            "test_record_batch_spill_sliced_binary_view",
+            SpillCompression::Uncompressed,
+            metrics.clone(),
+            None,
+        )?;
+
+        let rows_per_slice = NUM_ROWS / NUM_SLICES;
+        assert_eq!(rows_per_slice * NUM_SLICES, NUM_ROWS);
+        for i in 0..NUM_SLICES {
+            let slice = batch.slice(i * rows_per_slice, rows_per_slice);
+            writer.write_batch(slice)?;
+        }
+
+        let file = writer.finish()?;
+        let spill_size = file.current_disk_usage() as usize;
+        assert!(
+            spill_size <= (batch_size as f64 * 1.2) as usize,
+            "spill file unexpectedly large for sliced BinaryView batch: 
spill_size={spill_size}, batch_size={batch_size}"
+        );
+
+        Ok(())
+    }
+}


Reply via email to