This is an automated email from the ASF dual-hosted git repository.

kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 71123806 feat(rust/sedona-spatial-join): Add a repartitioner to write 
spatially repartitioned data to spill files (#527)
71123806 is described below

commit 71123806312a23ca0550dc341360c90590f1aee8
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Jan 21 14:43:04 2026 +0800

    feat(rust/sedona-spatial-join): Add a repartitioner to write spatially 
repartitioned data to spill files (#527)
    
    This implements part of https://github.com/apache/sedona-db/issues/436. The 
build side and probe side data will be spatially repartitioned using 
`StreamRepartitioner` introduced by this patch when the build side does not fit 
in memory. The code that integrates `StreamRepartitioner` with 
`SpatialJoinExec` and `SpatialJoinStream` will be submitted later.
    
    Co-authored-by: Copilot <[email protected]>
---
 rust/sedona-functions/src/st_analyze_agg.rs        |   30 +-
 rust/sedona-geometry/src/analyze.rs                |    9 +-
 rust/sedona-spatial-join/Cargo.toml                |    5 +
 .../bench/partitioning/stream_repartitioner.rs     |  243 ++++
 rust/sedona-spatial-join/src/evaluated_batch.rs    |    5 +
 .../src/evaluated_batch/evaluated_batch_stream.rs  |    6 +-
 .../evaluated_batch_stream/in_mem.rs               |    2 +-
 .../src/index/build_side_collector.rs              |    2 +-
 rust/sedona-spatial-join/src/operand_evaluator.rs  |    3 +
 rust/sedona-spatial-join/src/partitioning.rs       |    2 +
 .../src/partitioning/partition_slots.rs            |   85 ++
 .../src/partitioning/stream_repartitioner.rs       | 1359 ++++++++++++++++++++
 rust/sedona-spatial-join/src/partitioning/util.rs  |    7 +
 rust/sedona-spatial-join/src/stream.rs             |    2 +-
 rust/sedona-spatial-join/src/utils/spill.rs        |    8 +-
 15 files changed, 1741 insertions(+), 27 deletions(-)

diff --git a/rust/sedona-functions/src/st_analyze_agg.rs 
b/rust/sedona-functions/src/st_analyze_agg.rs
index 9dd49b26..82647a6d 100644
--- a/rust/sedona-functions/src/st_analyze_agg.rs
+++ b/rust/sedona-functions/src/st_analyze_agg.rs
@@ -34,7 +34,7 @@ use sedona_expr::aggregate_udf::SedonaAccumulatorRef;
 use sedona_expr::aggregate_udf::SedonaAggregateUDF;
 use sedona_expr::item_crs::ItemCrsSedonaAccumulator;
 use sedona_expr::{aggregate_udf::SedonaAccumulator, statistics::GeoStatistics};
-use sedona_geometry::analyze::GeometryAnalysis;
+use sedona_geometry::analyze::GeometrySummary;
 use sedona_geometry::interval::IntervalTrait;
 use sedona_geometry::types::{GeometryTypeAndDimensions, 
GeometryTypeAndDimensionsSet};
 use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
@@ -246,25 +246,29 @@ impl AnalyzeAccumulator {
         }
     }
 
-    pub fn update_statistics(&mut self, geom: &Wkb, size_bytes: usize) -> 
Result<()> {
+    pub fn update_statistics(&mut self, geom: &Wkb) -> Result<()> {
         // Get geometry analysis information
-        let analysis = sedona_geometry::analyze::analyze_geometry(geom)
+        let summary = sedona_geometry::analyze::analyze_geometry(geom)
             .map_err(|e| DataFusionError::External(Box::new(e)))?;
 
+        self.ingest_geometry_summary(&summary);
+
+        Ok(())
+    }
+
+    pub fn ingest_geometry_summary(&mut self, summary: &GeometrySummary) {
         // Start with a clone of the current stats
         let mut stats = self.stats.clone();
 
         // Update each component of the statistics
-        stats = self.update_basic_counts(stats, size_bytes);
-        stats = self.update_geometry_type_counts(stats, &analysis);
-        stats = self.update_point_count(stats, analysis.point_count);
-        stats = self.update_envelope_info(stats, &analysis);
-        stats = self.update_geometry_types(stats, analysis.geometry_type);
+        stats = self.update_basic_counts(stats, summary.size_bytes);
+        stats = self.update_geometry_type_counts(stats, summary);
+        stats = self.update_point_count(stats, summary.point_count);
+        stats = self.update_envelope_info(stats, summary);
+        stats = self.update_geometry_types(stats, summary.geometry_type);
 
         // Assign the updated stats back to self.stats
         self.stats = stats;
-
-        Ok(())
     }
 
     pub fn finish(self) -> GeoStatistics {
@@ -284,7 +288,7 @@ impl AnalyzeAccumulator {
     fn update_geometry_type_counts(
         &self,
         stats: GeoStatistics,
-        analysis: &GeometryAnalysis,
+        analysis: &GeometrySummary,
     ) -> GeoStatistics {
         // Add the counts from analysis to existing stats
         let puntal = stats.puntal_count().unwrap_or(0) + analysis.puntal_count;
@@ -309,7 +313,7 @@ impl AnalyzeAccumulator {
     fn update_envelope_info(
         &self,
         stats: GeoStatistics,
-        analysis: &GeometryAnalysis,
+        analysis: &GeometrySummary,
     ) -> GeoStatistics {
         // The bbox is directly available on analysis, not wrapped in an Option
         let bbox = &analysis.bbox;
@@ -368,7 +372,7 @@ impl AnalyzeAccumulator {
     fn execute_update(&mut self, executor: WkbExecutor) -> Result<()> {
         executor.execute_wkb_void(|maybe_item| {
             if let Some(item) = maybe_item {
-                self.update_statistics(&item, item.buf().len())?;
+                self.update_statistics(&item)?;
             }
             Ok(())
         })?;
diff --git a/rust/sedona-geometry/src/analyze.rs 
b/rust/sedona-geometry/src/analyze.rs
index e34e191b..aa01f11a 100644
--- a/rust/sedona-geometry/src/analyze.rs
+++ b/rust/sedona-geometry/src/analyze.rs
@@ -23,9 +23,10 @@ use crate::{
 };
 use wkb::reader::Wkb;
 
-/// Contains analysis results for a geometry
+/// Captures the size, bounds, and type-derived counts for a single geometry.
+/// Used as the per-geometry input that eventually feeds aggregated 
`GeoStatistics`.
 #[derive(Debug, Clone)]
-pub struct GeometryAnalysis {
+pub struct GeometrySummary {
     pub size_bytes: usize,
     pub point_count: i64,
     pub geometry_type: GeometryTypeAndDimensions,
@@ -37,7 +38,7 @@ pub struct GeometryAnalysis {
 }
 
 /// Analyzes a WKB geometry and returns its size, point count, dimensions, and 
type
-pub fn analyze_geometry(geom: &Wkb) -> Result<GeometryAnalysis, 
SedonaGeometryError> {
+pub fn analyze_geometry(geom: &Wkb) -> Result<GeometrySummary, 
SedonaGeometryError> {
     // Get size in bytes directly from WKB buffer
     let size_bytes = geom.buf().len();
 
@@ -74,7 +75,7 @@ pub fn analyze_geometry(geom: &Wkb) -> 
Result<GeometryAnalysis, SedonaGeometryEr
         GeometryTypeId::GeometryCollection
     ) as i64;
 
-    Ok(GeometryAnalysis {
+    Ok(GeometrySummary {
         size_bytes,
         point_count,
         geometry_type,
diff --git a/rust/sedona-spatial-join/Cargo.toml 
b/rust/sedona-spatial-join/Cargo.toml
index 9831c59b..010c2ead 100644
--- a/rust/sedona-spatial-join/Cargo.toml
+++ b/rust/sedona-spatial-join/Cargo.toml
@@ -100,3 +100,8 @@ harness = false
 name = "external_evaluated_batch_stream"
 path = "bench/evaluated_batch/external_evaluated_batch_stream.rs"
 harness = false
+
+[[bench]]
+name = "stream_repartitioner"
+path = "bench/partitioning/stream_repartitioner.rs"
+harness = false
diff --git 
a/rust/sedona-spatial-join/bench/partitioning/stream_repartitioner.rs 
b/rust/sedona-spatial-join/bench/partitioning/stream_repartitioner.rs
new file mode 100644
index 00000000..62641a43
--- /dev/null
+++ b/rust/sedona-spatial-join/bench/partitioning/stream_repartitioner.rs
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{
+    atomic::{AtomicU64, Ordering},
+    Arc,
+};
+use std::time::Duration;
+
+use arrow_array::{
+    ArrayRef, BinaryArray, Date32Array, Int64Array, RecordBatch, StringArray,
+    TimestampMicrosecondArray,
+};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use criterion::{criterion_group, criterion_main, BatchSize, Criterion, 
Throughput};
+use datafusion::config::SpillCompression;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
+use futures::executor::block_on;
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use sedona_geometry::{bounding_box::BoundingBox, interval::IntervalTrait};
+use sedona_schema::datatypes::WKB_GEOMETRY;
+use sedona_spatial_join::evaluated_batch::{
+    evaluated_batch_stream::{in_mem::InMemoryEvaluatedBatchStream, 
SendableEvaluatedBatchStream},
+    EvaluatedBatch,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::partitioning::PartitionedSide;
+use sedona_spatial_join::partitioning::{
+    kdb::KDBPartitioner, stream_repartitioner::StreamRepartitioner, 
SpatialPartitioner,
+};
+
+const RNG_SEED: u64 = 0x05ED_04A5;
+const NUM_BATCHES: usize = 50;
+const ROWS_PER_BATCH: usize = 8192;
+const SAMPLE_FOR_PARTITIONER: usize = 1_000;
+const MAX_ITEMS_PER_NODE: usize = 128;
+const MAX_LEVELS: usize = 4;
+const REPARTITIONER_BUFFER_BYTES: usize = 8 * 1024 * 1024;
+
+fn bench_stream_partitioner(c: &mut Criterion) {
+    let extent = Arc::new(default_extent());
+    let partitioner = build_partitioner(extent.as_ref());
+    let schema = Arc::new(build_schema());
+    let runtime_env = Arc::new(RuntimeEnv::default());
+    let metrics_set = ExecutionPlanMetricsSet::new();
+    let spill_metrics = SpillMetrics::new(&metrics_set, 0);
+    let seed_counter = Arc::new(AtomicU64::new(RNG_SEED));
+
+    let mut group = c.benchmark_group("stream_partitioner_repartition");
+    group.throughput(Throughput::Elements((NUM_BATCHES * ROWS_PER_BATCH) as 
u64));
+
+    group.bench_function("kdb_repartition", |b| {
+        let seed_counter = Arc::clone(&seed_counter);
+        let schema = Arc::clone(&schema);
+        let runtime_env = Arc::clone(&runtime_env);
+        let partitioner = Arc::clone(&partitioner);
+        let spill_metrics = spill_metrics.clone();
+        let extent = Arc::clone(&extent);
+
+        b.iter_batched(
+            move || {
+                let seed = seed_counter.fetch_add(1, Ordering::Relaxed);
+                generate_stream(seed, schema.clone(), extent.as_ref())
+            },
+            move |stream| {
+                block_on(async {
+                    StreamRepartitioner::builder(
+                        runtime_env.clone(),
+                        partitioner.clone(),
+                        PartitionedSide::BuildSide,
+                        spill_metrics.clone(),
+                    )
+                    .spill_compression(SpillCompression::Uncompressed)
+                    .buffer_bytes_threshold(REPARTITIONER_BUFFER_BYTES)
+                    .target_batch_size(ROWS_PER_BATCH)
+                    .spilled_batch_in_memory_size_threshold(None)
+                    .build()
+                    .repartition_stream(stream)
+                    .await
+                    .expect("repartition should succeed in benchmark");
+                });
+            },
+            BatchSize::SmallInput,
+        );
+    });
+
+    group.finish();
+}
+
+fn generate_stream(
+    seed: u64,
+    schema: Arc<Schema>,
+    extent: &BoundingBox,
+) -> SendableEvaluatedBatchStream {
+    let mut rng = StdRng::seed_from_u64(seed);
+    let mut batches = Vec::with_capacity(NUM_BATCHES);
+    for _ in 0..NUM_BATCHES {
+        batches.push(random_evaluated_batch(
+            schema.clone(),
+            ROWS_PER_BATCH,
+            extent,
+            &mut rng,
+        ));
+    }
+    in_memory_stream(schema, batches)
+}
+
+fn random_evaluated_batch(
+    schema: Arc<Schema>,
+    rows: usize,
+    extent: &BoundingBox,
+    rng: &mut StdRng,
+) -> EvaluatedBatch {
+    let batch = random_record_batch(schema, rows, rng);
+    let geom_array = random_geometry_array(rows, extent, rng);
+    let geom_array = EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY)
+        .expect("geometry array allocation should succeed");
+    EvaluatedBatch { batch, geom_array }
+}
+
+fn random_record_batch(schema: Arc<Schema>, rows: usize, rng: &mut StdRng) -> 
RecordBatch {
+    let ids = Int64Array::from_iter_values((0..rows).map(|_| 
rng.random_range(0..1_000_000_i64)));
+    let words = StringArray::from_iter_values((0..rows).map(|_| 
random_string(rng)));
+    let dates = Date32Array::from_iter_values((0..rows).map(|_| 
rng.random_range(18_000..20_000)));
+    let timestamps = TimestampMicrosecondArray::from_iter_values(
+        (0..rows).map(|_| 
rng.random_range(1_600_000_000_000_000i64..1_700_000_000_000_000)),
+    );
+
+    let columns: Vec<ArrayRef> = vec![
+        Arc::new(ids),
+        Arc::new(words),
+        Arc::new(dates),
+        Arc::new(timestamps),
+    ];
+
+    RecordBatch::try_new(schema, columns).expect("record batch assembly should 
succeed")
+}
+
+fn random_geometry_array(rows: usize, extent: &BoundingBox, rng: &mut StdRng) 
-> ArrayRef {
+    let wkbs: Vec<Vec<u8>> = (0..rows)
+        .map(|_| {
+            let x = rng.random_range(extent.x().lo()..=extent.x().hi());
+            let y = rng.random_range(extent.y().lo()..=extent.y().hi());
+            point_wkb(x, y)
+        })
+        .collect();
+
+    let binary = BinaryArray::from_iter_values(wkbs.iter().map(|wkb| 
wkb.as_slice()));
+    Arc::new(binary)
+}
+
+fn random_string(rng: &mut StdRng) -> String {
+    const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz";
+    let mut buf = [0u8; 8];
+    for slot in &mut buf {
+        let idx = rng.random_range(0..CHARSET.len());
+        *slot = CHARSET[idx];
+    }
+    String::from_utf8_lossy(&buf).to_string()
+}
+
+fn build_schema() -> Schema {
+    Schema::new(vec![
+        Field::new("id", DataType::Int64, false),
+        Field::new("category", DataType::Utf8, true),
+        Field::new("event_date", DataType::Date32, true),
+        Field::new(
+            "event_ts",
+            DataType::Timestamp(TimeUnit::Microsecond, None),
+            true,
+        ),
+    ])
+}
+
+fn build_partitioner(extent: &BoundingBox) -> Arc<dyn SpatialPartitioner + 
Send + Sync> {
+    let mut rng = StdRng::seed_from_u64(RNG_SEED ^ 0x00FF_FFFF);
+    let samples = (0..SAMPLE_FOR_PARTITIONER)
+        .map(|_| random_bbox(extent, &mut rng))
+        .collect::<Vec<_>>();
+
+    let partitioner = KDBPartitioner::build(
+        samples.into_iter(),
+        MAX_ITEMS_PER_NODE,
+        MAX_LEVELS,
+        extent.clone(),
+    )
+    .expect("kdb builder should succeed");
+
+    Arc::new(partitioner)
+}
+
+fn random_bbox(extent: &BoundingBox, rng: &mut StdRng) -> BoundingBox {
+    let span_x = (extent.x().hi() - extent.x().lo()) / 20.0;
+    let span_y = (extent.y().hi() - extent.y().lo()) / 20.0;
+    let width = rng.random_range(10.0..=span_x).max(1.0);
+    let height = rng.random_range(10.0..=span_y).max(1.0);
+    let min_x = rng.random_range(extent.x().lo()..=extent.x().hi() - width);
+    let min_y = rng.random_range(extent.y().lo()..=extent.y().hi() - height);
+    BoundingBox::xy((min_x, min_x + width), (min_y, min_y + height))
+}
+
+fn default_extent() -> BoundingBox {
+    BoundingBox::xy((0.0, 10_000.0), (0.0, 10_000.0))
+}
+
+fn point_wkb(x: f64, y: f64) -> Vec<u8> {
+    let mut buf = vec![1u8, 1, 0, 0, 0];
+    buf.extend_from_slice(&x.to_le_bytes());
+    buf.extend_from_slice(&y.to_le_bytes());
+    buf
+}
+
+criterion_group! {
+    name = stream_partitioner;
+    config = Criterion::default()
+        .sample_size(10)
+        .measurement_time(Duration::from_secs(4))
+        .warm_up_time(Duration::from_secs(2));
+    targets = bench_stream_partitioner
+}
+criterion_main!(stream_partitioner);
+
+fn in_memory_stream(
+    schema: Arc<Schema>,
+    batches: Vec<EvaluatedBatch>,
+) -> SendableEvaluatedBatchStream {
+    Box::pin(InMemoryEvaluatedBatchStream::new(schema, batches))
+}
diff --git a/rust/sedona-spatial-join/src/evaluated_batch.rs 
b/rust/sedona-spatial-join/src/evaluated_batch.rs
index aad3f11b..7fa0cd79 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
 use datafusion_common::Result;
 use datafusion_expr::ColumnarValue;
 use geo::Rect;
@@ -47,6 +48,10 @@ impl EvaluatedBatch {
         Ok(record_batch_size + geom_array_size)
     }
 
+    pub fn schema(&self) -> SchemaRef {
+        self.batch.schema()
+    }
+
     pub fn num_rows(&self) -> usize {
         self.batch.num_rows()
     }
diff --git 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
index c18761d2..49b4a6f4 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
@@ -25,7 +25,7 @@ use datafusion_common::Result;
 
 /// A stream that produces [`EvaluatedBatch`] items. This stream may have 
purely in-memory or
 /// out-of-core implementations. The type of the stream could be queried 
calling `is_external()`.
-pub(crate) trait EvaluatedBatchStream: Stream<Item = Result<EvaluatedBatch>> {
+pub trait EvaluatedBatchStream: Stream<Item = Result<EvaluatedBatch>> {
     /// Returns true if this stream is an external stream, where batch data 
were spilled to disk.
     fn is_external(&self) -> bool;
 
@@ -36,8 +36,8 @@ pub(crate) trait EvaluatedBatchStream: Stream<Item = 
Result<EvaluatedBatch>> {
     fn schema(&self) -> SchemaRef;
 }
 
-pub(crate) type SendableEvaluatedBatchStream = Pin<Box<dyn 
EvaluatedBatchStream + Send>>;
+pub type SendableEvaluatedBatchStream = Pin<Box<dyn EvaluatedBatchStream + 
Send>>;
 
 pub(crate) mod evaluate;
 pub mod external;
-pub(crate) mod in_mem;
+pub mod in_mem;
diff --git 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
index 550e308a..4ec4ba9c 100644
--- 
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
+++ 
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
@@ -27,7 +27,7 @@ use datafusion_common::Result;
 
 use crate::evaluated_batch::{evaluated_batch_stream::EvaluatedBatchStream, 
EvaluatedBatch};
 
-pub(crate) struct InMemoryEvaluatedBatchStream {
+pub struct InMemoryEvaluatedBatchStream {
     schema: SchemaRef,
     iter: IntoIter<EvaluatedBatch>,
 }
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs 
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index 9230aaf4..90ebb05f 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -103,7 +103,7 @@ impl BuildSideBatchesCollector {
             // Process the record batch and create a BuildSideBatch
             let geom_array = &build_side_batch.geom_array;
             for wkb in geom_array.wkbs().iter().flatten() {
-                analyzer.update_statistics(wkb, wkb.buf().len())?;
+                analyzer.update_statistics(wkb)?;
             }
 
             let in_mem_size = build_side_batch.in_mem_size()?;
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs 
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index 8b431396..a8b83264 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -91,6 +91,8 @@ pub(crate) fn create_operand_evaluator(
 
 /// Result of evaluating a geometry batch.
 pub struct EvaluatedGeometryArray {
+    /// Type of geometry_array
+    pub sedona_type: SedonaType,
     /// The array of geometries produced by evaluating the geometry expression.
     pub geometry_array: ArrayRef,
     /// The rects of the geometries in the geometry array. The length of this 
array is equal to the number of geometries.
@@ -139,6 +141,7 @@ impl EvaluatedGeometryArray {
             .map(|wkb| wkb.map(|wkb| unsafe { transmute(wkb) }))
             .collect();
         Ok(Self {
+            sedona_type: sedona_type.clone(),
             geometry_array,
             rects: rect_vec,
             distance: None,
diff --git a/rust/sedona-spatial-join/src/partitioning.rs 
b/rust/sedona-spatial-join/src/partitioning.rs
index d20eb523..fe495f6b 100644
--- a/rust/sedona-spatial-join/src/partitioning.rs
+++ b/rust/sedona-spatial-join/src/partitioning.rs
@@ -20,7 +20,9 @@ use sedona_geometry::bounding_box::BoundingBox;
 
 pub mod flat;
 pub mod kdb;
+pub(crate) mod partition_slots;
 pub mod rtree;
+pub mod stream_repartitioner;
 pub(crate) mod util;
 
 /// Spatial partitioning is different from traditional data partitioning such 
as hash partitioning.
diff --git a/rust/sedona-spatial-join/src/partitioning/partition_slots.rs 
b/rust/sedona-spatial-join/src/partitioning/partition_slots.rs
new file mode 100644
index 00000000..3916cea0
--- /dev/null
+++ b/rust/sedona-spatial-join/src/partitioning/partition_slots.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::partitioning::SpatialPartition;
+
+#[derive(Clone, Copy, Debug)]
+/// Maintains the slot mapping for all `SpatialPartition` variants, reserving
+/// contiguous indices for regular partitions plus dedicated None/Multi slots.
+pub struct PartitionSlots {
+    num_regular: usize,
+}
+
+impl PartitionSlots {
+    /// Create a slot manager for `num_regular` `SpatialPartition::Regular` 
entries.
+    /// Two additional slots are implicitly reserved: one for `None` and one 
for `Multi`.
+    pub fn new(num_regular: usize) -> Self {
+        Self { num_regular }
+    }
+
+    /// Return the total slot count (`Regular + None + Multi`).
+    pub fn total_slots(&self) -> usize {
+        self.num_regular + 2
+    }
+
+    /// Convert a `SpatialPartition` into its backing slot index.
+    pub fn slot(&self, partition: SpatialPartition) -> Option<usize> {
+        match partition {
+            SpatialPartition::Regular(id) => {
+                let id = id as usize;
+                if id < self.num_regular {
+                    Some(id)
+                } else {
+                    None
+                }
+            }
+            SpatialPartition::None => Some(self.none_slot()),
+            SpatialPartition::Multi => Some(self.multi_slot()),
+        }
+    }
+
+    /// Convert a slot index back into the corresponding `SpatialPartition` 
variant.
+    pub fn partition(&self, slot: usize) -> SpatialPartition {
+        if slot < self.num_regular {
+            SpatialPartition::Regular(slot as u32)
+        } else if slot == self.none_slot() {
+            SpatialPartition::None
+        } else if slot == self.multi_slot() {
+            SpatialPartition::Multi
+        } else {
+            panic!(
+                "invalid partition slot {slot} for {} regular partitions",
+                self.num_regular
+            );
+        }
+    }
+
+    /// Number of regular partitions
+    pub fn num_regular_partitions(&self) -> usize {
+        self.num_regular
+    }
+
+    /// Slot dedicated to `SpatialPartition::None`.
+    pub fn none_slot(&self) -> usize {
+        self.num_regular
+    }
+
+    /// Slot dedicated to `SpatialPartition::Multi`.
+    pub fn multi_slot(&self) -> usize {
+        self.num_regular + 1
+    }
+}
diff --git a/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs 
b/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
new file mode 100644
index 00000000..44591107
--- /dev/null
+++ b/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
@@ -0,0 +1,1359 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Streaming spatial partitioning utilities.
+//!
+//! This module provides helpers that repartition an [`EvaluatedBatch`] stream 
into
+//! spatial spill files using a [`SpatialPartitioner`]. Each regular 
partition, along
+//! with the None and Multi partitions, gets at most one spill file which can
+//! later be replayed via [`SendableEvaluatedBatchStream`].
+
+use std::sync::Arc;
+
+use crate::{
+    evaluated_batch::{
+        evaluated_batch_stream::SendableEvaluatedBatchStream, 
spill::EvaluatedBatchSpillWriter,
+        EvaluatedBatch,
+    },
+    operand_evaluator::EvaluatedGeometryArray,
+    partitioning::{
+        partition_slots::PartitionSlots, util::geo_rect_to_bbox, 
PartitionedSide, SpatialPartition,
+        SpatialPartitioner,
+    },
+};
+use arrow::compute::interleave as arrow_interleave;
+use arrow::compute::interleave_record_batch;
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use datafusion::config::SpillCompression;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::SpillMetrics;
+use futures::StreamExt;
+use sedona_common::sedona_internal_err;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
+use sedona_geometry::bounding_box::BoundingBox;
+use sedona_geometry::interval::IntervalTrait;
+use sedona_schema::datatypes::WKB_GEOMETRY;
+
+/// Result emitted after a stream is spatially repartitioned.
+#[derive(Debug)]
+pub struct SpilledPartitions {
+    slots: PartitionSlots,
+    partitions: Vec<Option<SpilledPartition>>,
+}
+
+/// Metadata and spill files produced for a single spatial partition.
+///
+/// A `SpilledPartition` corresponds to one logical spatial partition 
(including
+/// special partitions such as `None` or `Multi`) after the stream 
repartitioner
+/// has flushed in-memory data to disk. It tracks the set of temporary spill
+/// files that hold the partition's rows, along with aggregated geospatial
+/// statistics and the total number of rows written.
+#[derive(Debug, Clone)]
+pub struct SpilledPartition {
+    /// Temporary spill files containing the rows assigned to this partition.
+    spill_files: Vec<Arc<RefCountedTempFile>>,
+    /// Aggregated geospatial statistics computed over all rows in this 
partition.
+    geo_statistics: GeoStatistics,
+    /// Total number of rows that were written into `spill_files`.
+    num_rows: usize,
+}
+
+impl SpilledPartition {
+    /// Construct a spilled partition from finalized spill files and 
aggregated statistics.
+    pub fn new(
+        spill_files: Vec<Arc<RefCountedTempFile>>,
+        geo_statistics: GeoStatistics,
+        num_rows: usize,
+    ) -> Self {
+        Self {
+            spill_files,
+            geo_statistics,
+            num_rows,
+        }
+    }
+
+    /// Create an empty spilled partition (no files, empty stats, zero rows).
+    pub fn empty() -> Self {
+        Self::new(Vec::new(), GeoStatistics::empty(), 0)
+    }
+
+    /// Spill files produced for this partition.
+    pub fn spill_files(&self) -> &[Arc<RefCountedTempFile>] {
+        &self.spill_files
+    }
+
+    /// Aggregated geospatial statistics for this partition.
+    pub fn geo_statistics(&self) -> &GeoStatistics {
+        &self.geo_statistics
+    }
+
+    /// Total number of rows assigned to this partition.
+    pub fn num_rows(&self) -> usize {
+        self.num_rows
+    }
+
+    /// Bounding box, if available from accumulated statistics.
+    pub fn bounding_box(&self) -> Option<&BoundingBox> {
+        self.geo_statistics.bbox()
+    }
+
+    /// Consume this value and return only the spill files.
+    pub fn into_spill_files(self) -> Vec<Arc<RefCountedTempFile>> {
+        self.spill_files
+    }
+
+    /// Consume this value and return `(spill_files, geo_statistics, 
num_rows)`.
+    pub fn into_inner(self) -> (Vec<Arc<RefCountedTempFile>>, GeoStatistics, 
usize) {
+        (self.spill_files, self.geo_statistics, self.num_rows)
+    }
+}
+
+impl SpilledPartitions {
+    /// Construct a new spilled-partitions container for the provided slots.
+    ///
+    /// `partitions` must contain one entry for every slot in `slots`.
+    pub fn new(slots: PartitionSlots, partitions: Vec<SpilledPartition>) -> 
Self {
+        assert_eq!(partitions.len(), slots.total_slots());
+        let partitions = partitions.into_iter().map(Some).collect();
+        Self { slots, partitions }
+    }
+
+    /// Number of regular partitions
+    pub fn num_regular_partitions(&self) -> usize {
+        self.slots.num_regular_partitions()
+    }
+
+    /// Get the slots for mapping spatial partitions to sequential 0-based 
indexes
+    pub fn slots(&self) -> PartitionSlots {
+        self.slots
+    }
+
+    /// Count of spill files that were actually materialized.
+    pub fn spill_file_count(&self) -> usize {
+        self.partitions
+            .iter()
+            .map(|partition| partition.as_ref().map_or(0, |p| 
p.spill_files().len()))
+            .sum()
+    }
+
+    /// Retrieve the spilled partition for a given spatial partition.
+    pub fn spilled_partition(&self, partition: SpatialPartition) -> 
Result<&SpilledPartition> {
+        let Some(slot) = self.slots.slot(partition) else {
+            return sedona_internal_err!(
+                "Invalid partition {:?} for {} regular partitions",
+                partition,
+                self.slots.num_regular_partitions()
+            );
+        };
+        match &self.partitions[slot] {
+            Some(spilled_partition) => Ok(spilled_partition),
+            None => sedona_internal_err!(
+                "Spilled partition {:?} has already been taken away",
+                partition
+            ),
+        }
+    }
+
+    /// Consume this structure into concrete spilled partitions.
+    pub fn into_spilled_partitions(self) -> Result<Vec<SpilledPartition>> {
+        let mut partitions = Vec::with_capacity(self.partitions.len());
+        for partition_opt in self.partitions {
+            match partition_opt {
+                Some(partition) => partitions.push(partition),
+                None => {
+                    return sedona_internal_err!(
+                        "Some of the spilled partitions have already been 
taken away"
+                    )
+                }
+            }
+        }
+        Ok(partitions)
+    }
+
+    /// Get a clone of the spill files in specified partition without 
consuming it. This is
+    /// mainly for retrieving the Multi partition, which may be scanned 
multiple times.
+    pub fn get_spilled_partition(&self, partition: SpatialPartition) -> 
Result<SpilledPartition> {
+        let Some(slot) = self.slots.slot(partition) else {
+            return sedona_internal_err!(
+                "Invalid partition {:?} for {} regular partitions",
+                partition,
+                self.slots.num_regular_partitions()
+            );
+        };
+        match &self.partitions[slot] {
+            Some(spilled_partition) => Ok(spilled_partition.clone()),
+            None => sedona_internal_err!(
+                "Spilled partition {:?} has already been taken away",
+                partition
+            ),
+        }
+    }
+
+    /// Take the spill files in specified partition from it without consuming 
this value. This is
+    /// mainly for retrieving the regular partitions, which will only be 
scanned once.
+    pub fn take_spilled_partition(
+        &mut self,
+        partition: SpatialPartition,
+    ) -> Result<SpilledPartition> {
+        let Some(slot) = self.slots.slot(partition) else {
+            return sedona_internal_err!(
+                "Invalid partition {:?} for {} regular partitions",
+                partition,
+                self.slots.num_regular_partitions()
+            );
+        };
+        match std::mem::take(&mut self.partitions[slot]) {
+            Some(spilled_partition) => Ok(spilled_partition),
+            None => sedona_internal_err!(
+                "Spilled partition {:?} has already been taken away",
+                partition
+            ),
+        }
+    }
+
+    /// Are the spill files still present and can they be taken away?
+    pub fn can_take_spilled_partition(&self, partition: SpatialPartition) -> 
bool {
+        let Some(slot) = self.slots.slot(partition) else {
+            return false;
+        };
+        self.partitions[slot].is_some()
+    }
+
+    /// Write debug info for this spilled partitions
+    pub fn debug_print(&self, f: &mut impl std::fmt::Write) -> 
std::fmt::Result {
+        for k in 0..self.slots.total_slots() {
+            if let Some(spilled_partition) = &self.partitions[k] {
+                let bbox_str = if let Some(bbox) = 
spilled_partition.bounding_box() {
+                    format!(
+                        "x: [{:.6}, {:.6}], y: [{:.6}, {:.6}]",
+                        bbox.x().lo(),
+                        bbox.x().hi(),
+                        bbox.y().lo(),
+                        bbox.y().hi()
+                    )
+                } else {
+                    "None".to_string()
+                };
+                let spill_files = spilled_partition.spill_files();
+                let spill_file_sizes = spill_files
+                    .iter()
+                    .map(|sp| {
+                        sp.inner()
+                            .as_file()
+                            .metadata()
+                            .map(|m| m.len())
+                            .unwrap_or(0)
+                    })
+                    .collect::<Vec<_>>();
+                writeln!(
+                    f,
+                    "Partition {:?}: {} spill file(s), num non-empty geoms: 
{:?}, bbox: {}, spill file sizes: {:?}",
+                    self.slots.partition(k),
+                    spilled_partition.spill_files().len(),
+                    spilled_partition
+                        .geo_statistics()
+                        .total_geometries()
+                        .unwrap_or_default(),
+                    bbox_str,
+                    spill_file_sizes,
+                )?;
+            } else {
+                writeln!(f, "Partition {}: already taken away", k)?;
+            }
+        }
+        Ok(())
+    }
+}
+
+/// Incremental (stateful) repartitioner for an [`EvaluatedBatch`] stream.
+///
+/// This type assigns each incoming row to a [`SpatialPartition`] (based on a
+/// [`SpatialPartitioner`]) and writes the partitioned output into spill files.
+///
+/// It buffers incoming data and keeps per-partition spill writers open across
+/// batches to amortize setup cost. Flushing is controlled via
+/// `buffer_bytes_threshold`, and output is optionally chunked to approximately
+/// `target_batch_size` rows per partition batch.
+pub struct StreamRepartitioner {
+    runtime_env: Arc<RuntimeEnv>,
+    partitioner: Arc<dyn SpatialPartitioner>,
+    partitioned_side: PartitionedSide,
+    slots: PartitionSlots,
+    /// Spill files for each spatial partition.
+    /// The None and Multi partitions should be None when repartitioning the 
build side.
+    spill_registry: Vec<Option<EvaluatedBatchSpillWriter>>,
+    /// Geospatial statistics for each spatial partition.
+    geo_stats_accumulators: Vec<AnalyzeAccumulator>,
+    /// Number of rows in each spatial partition.
+    num_rows: Vec<usize>,
+    slot_assignments: Vec<Vec<(usize, usize)>>,
+    row_assignments_buffer: Vec<SpatialPartition>,
+    spill_compression: SpillCompression,
+    spill_metrics: SpillMetrics,
+    buffer_bytes_threshold: usize,
+    target_batch_size: usize,
+    spilled_batch_in_memory_size_threshold: Option<usize>,
+    pending_batches: Vec<EvaluatedBatch>,
+    pending_bytes: usize,
+}
+
+/// Builder for configuring and constructing a [`StreamRepartitioner`].
+///
+/// Defaults are chosen to be safe and explicit:
+/// - `spill_compression`: [`SpillCompression::Uncompressed`]
+/// - `buffer_bytes_threshold`: `0` (flush on every inserted batch)
+/// - `target_batch_size`: `0` (do not chunk; emit one batch per partition 
flush)
+/// - `spilled_batch_in_memory_size_threshold`: `None`
+pub struct StreamRepartitionerBuilder {
+    runtime_env: Arc<RuntimeEnv>,
+    partitioner: Arc<dyn SpatialPartitioner>,
+    partitioned_side: PartitionedSide,
+    spill_compression: SpillCompression,
+    spill_metrics: SpillMetrics,
+    buffer_bytes_threshold: usize,
+    target_batch_size: usize,
+    spilled_batch_in_memory_size_threshold: Option<usize>,
+}
+
+impl StreamRepartitionerBuilder {
+    /// Set spill compression applied to newly created per-partition spill 
files.
+    pub fn spill_compression(mut self, spill_compression: SpillCompression) -> 
Self {
+        self.spill_compression = spill_compression;
+        self
+    }
+
+    /// Set the in-memory buffering threshold (in bytes).
+    ///
+    /// When the buffered in-memory size meets/exceeds this threshold, pending
+    /// rows are flushed to partition writers.
+    pub fn buffer_bytes_threshold(mut self, buffer_bytes_threshold: usize) -> 
Self {
+        self.buffer_bytes_threshold = buffer_bytes_threshold;
+        self
+    }
+
+    /// Set the target maximum number of rows per flushed batch (per 
partition).
+    ///
+    /// A value of `0` disables chunking.
+    pub fn target_batch_size(mut self, target_batch_size: usize) -> Self {
+        self.target_batch_size = target_batch_size;
+        self
+    }
+
+    /// Set an optional threshold used by spill writers to decide whether to 
keep a
+    /// batch in memory vs. spilling.
+    pub fn spilled_batch_in_memory_size_threshold(
+        mut self,
+        spilled_batch_in_memory_size_threshold: Option<usize>,
+    ) -> Self {
+        self.spilled_batch_in_memory_size_threshold = 
spilled_batch_in_memory_size_threshold;
+        self
+    }
+
+    /// Build a [`StreamRepartitioner`] with the configured parameters.
+    pub fn build(self) -> StreamRepartitioner {
+        let slots = 
PartitionSlots::new(self.partitioner.num_regular_partitions());
+        let slot_count = slots.total_slots();
+        StreamRepartitioner {
+            runtime_env: self.runtime_env,
+            partitioner: self.partitioner,
+            partitioned_side: self.partitioned_side,
+            slots,
+            spill_registry: (0..slot_count).map(|_| None).collect(),
+            geo_stats_accumulators: (0..slot_count)
+                .map(|_| AnalyzeAccumulator::new(WKB_GEOMETRY, WKB_GEOMETRY))
+                .collect(),
+            num_rows: vec![0; slot_count],
+            slot_assignments: (0..slot_count).map(|_| Vec::new()).collect(),
+            row_assignments_buffer: Vec::new(),
+            spill_compression: self.spill_compression,
+            spill_metrics: self.spill_metrics,
+            buffer_bytes_threshold: self.buffer_bytes_threshold,
+            target_batch_size: self.target_batch_size,
+            spilled_batch_in_memory_size_threshold: 
self.spilled_batch_in_memory_size_threshold,
+            pending_batches: Vec::new(),
+            pending_bytes: 0,
+        }
+    }
+}
+
+impl StreamRepartitioner {
+    /// Start building a new [`StreamRepartitioner`].
+    ///
+    /// This captures the required configuration (runtime, partitioner, side, 
and
+    /// spill metrics). Optional parameters can then be set on the returned 
builder.
+    pub fn builder(
+        runtime_env: Arc<RuntimeEnv>,
+        partitioner: Arc<dyn SpatialPartitioner>,
+        partitioned_side: PartitionedSide,
+        spill_metrics: SpillMetrics,
+    ) -> StreamRepartitionerBuilder {
+        StreamRepartitionerBuilder {
+            runtime_env,
+            partitioner,
+            partitioned_side,
+            spill_compression: SpillCompression::Uncompressed,
+            spill_metrics,
+            buffer_bytes_threshold: 0,
+            target_batch_size: 0,
+            spilled_batch_in_memory_size_threshold: None,
+        }
+    }
+
+    /// Repartition a stream of evaluated batches into per-partition spill 
files.
+    ///
+    /// This consumes the repartitioner and returns [`SpilledPartitions`] once 
the
+    /// input stream is exhausted.
+    pub async fn repartition_stream(
+        mut self,
+        mut stream: SendableEvaluatedBatchStream,
+    ) -> Result<SpilledPartitions> {
+        while let Some(batch_result) = stream.next().await {
+            let batch = batch_result?;
+            self.repartition_batch(batch)?;
+        }
+        self.finish()
+    }
+
+    /// Route a single evaluated batch into its corresponding spill writers.
+    ///
+    /// This runs the spatial partitioner to compute row assignments, buffers 
the
+    /// batch, and may flush pending buffered data depending on configuration.
+    pub fn repartition_batch(&mut self, batch: EvaluatedBatch) -> Result<()> {
+        let mut row_assignments = std::mem::take(&mut 
self.row_assignments_buffer);
+        assign_rows(
+            &batch,
+            self.partitioner.as_ref(),
+            self.partitioned_side,
+            &mut row_assignments,
+        )?;
+        self.insert_repartitioned_batch(batch, &row_assignments)?;
+        self.row_assignments_buffer = row_assignments;
+        Ok(())
+    }
+
+    /// Insert batch with row assignments into the repartitioner. The spatial 
partitioner
+    /// does not need to be invoked in this method. This is useful when the 
batch has
+    /// already been partitioned by calling assign_rows.
+    ///
+    /// `row_assignments` must have the same length as the batch row count and 
contain
+    /// only partitions valid for the configured [`SpatialPartitioner`].
+    pub fn insert_repartitioned_batch(
+        &mut self,
+        batch: EvaluatedBatch,
+        row_assignments: &[SpatialPartition],
+    ) -> Result<()> {
+        let batch_idx = self.pending_batches.len();
+        self.pending_bytes += batch.in_mem_size()?;
+        self.pending_batches.push(batch);
+        let batch_ref = &self.pending_batches[batch_idx];
+        assert_eq!(row_assignments.len(), batch_ref.num_rows());
+        for (row_idx, partition) in row_assignments.iter().enumerate() {
+            let Some(slot_idx) = self.slots.slot(*partition) else {
+                return sedona_internal_err!(
+                    "Invalid partition {:?} for {} regular partitions",
+                    partition,
+                    self.slots.num_regular_partitions()
+                );
+            };
+            if let Some(wkb) = batch_ref.wkb(row_idx) {
+                self.geo_stats_accumulators[slot_idx].update_statistics(wkb)?;
+            }
+            self.slot_assignments[slot_idx].push((batch_idx, row_idx));
+            self.num_rows[slot_idx] += 1;
+        }
+        let threshold = self.buffer_bytes_threshold;
+        if threshold == 0 || self.pending_bytes >= threshold {
+            self.flush_pending_batches()?;
+        }
+        Ok(())
+    }
+
+    fn flush_pending_batches(&mut self) -> Result<()> {
+        if self.pending_batches.is_empty() {
+            debug_assert!(self
+                .slot_assignments
+                .iter()
+                .all(|assignments| assignments.is_empty()));
+            return Ok(());
+        }
+
+        let pending_batches = std::mem::take(&mut self.pending_batches);
+        self.pending_bytes = 0;
+
+        let record_batches: Vec<&RecordBatch> =
+            pending_batches.iter().map(|batch| &batch.batch).collect();
+        let geom_arrays: Vec<&EvaluatedGeometryArray> = pending_batches
+            .iter()
+            .map(|batch| &batch.geom_array)
+            .collect();
+
+        let mut slot_assignments = std::mem::take(&mut self.slot_assignments);
+
+        for (slot_idx, assignments) in slot_assignments.iter_mut().enumerate() 
{
+            if assignments.is_empty() {
+                continue;
+            }
+            let chunk_cap = if self.target_batch_size == 0 {
+                assignments.len()
+            } else {
+                self.target_batch_size
+            }
+            .max(1);
+            for chunk in assignments.chunks(chunk_cap) {
+                let sliced_batch =
+                    interleave_evaluated_batch(&record_batches, &geom_arrays, 
chunk)?;
+                let writer = self.ensure_writer(slot_idx, &sliced_batch)?;
+                writer.append(&sliced_batch)?;
+            }
+
+            assignments.clear();
+        }
+
+        self.slot_assignments = slot_assignments;
+        Ok(())
+    }
+
+    /// Seal every partition and return their associated spill files and 
bounds.
+    ///
+    /// This flushes any buffered rows, closes all partition writers, and 
returns a
+    /// [`SpilledPartitions`] summary.
+    pub fn finish(mut self) -> Result<SpilledPartitions> {
+        self.flush_pending_batches()?;
+        let slot_count = self.slots.total_slots();
+        let mut spilled_partition_vec = Vec::with_capacity(slot_count);
+        for ((writer_opt, accumulator), num_rows) in self
+            .spill_registry
+            .into_iter()
+            .zip(self.geo_stats_accumulators.into_iter())
+            .zip(self.num_rows.into_iter())
+        {
+            let spilled_partition = if let Some(writer) = writer_opt {
+                let spill_files = vec![Arc::new(writer.finish()?)];
+                let geo_statistics = accumulator.finish();
+                SpilledPartition::new(spill_files, geo_statistics, num_rows)
+            } else {
+                SpilledPartition::empty()
+            };
+            spilled_partition_vec.push(spilled_partition);
+        }
+
+        Ok(SpilledPartitions::new(self.slots, spilled_partition_vec))
+    }
+
+    fn ensure_writer(
+        &mut self,
+        slot_idx: usize,
+        batch: &EvaluatedBatch,
+    ) -> Result<&mut EvaluatedBatchSpillWriter> {
+        if self.spill_registry[slot_idx].is_none() {
+            self.spill_registry[slot_idx] = 
Some(EvaluatedBatchSpillWriter::try_new(
+                Arc::clone(&self.runtime_env),
+                batch.schema(),
+                &batch.geom_array.sedona_type,
+                "streaming repartitioner",
+                self.spill_compression,
+                self.spill_metrics.clone(),
+                self.spilled_batch_in_memory_size_threshold,
+            )?);
+        }
+        Ok(self.spill_registry[slot_idx]
+            .as_mut()
+            .expect("writer inserted above"))
+    }
+}
+
+/// Populate `assignments` with the spatial partition for every row in `batch`,
+/// reusing the provided buffer to avoid repeated allocations. The vector 
length
+/// after this call matches `batch.rects().len()` and each entry records which
+/// [`SpatialPartition`] the corresponding row belongs to.
+pub(crate) fn assign_rows(
+    batch: &EvaluatedBatch,
+    partitioner: &dyn SpatialPartitioner,
+    partitioned_side: PartitionedSide,
+    assignments: &mut Vec<SpatialPartition>,
+) -> Result<()> {
+    assignments.clear();
+    assignments.reserve(batch.rects().len());
+
+    match partitioned_side {
+        PartitionedSide::BuildSide => {
+            let mut cnt = 0;
+            let num_regular_partitions = partitioner.num_regular_partitions() 
as u32;
+            for rect_opt in batch.rects() {
+                let partition = match rect_opt {
+                    Some(rect) => 
partitioner.partition_no_multi(&geo_rect_to_bbox(rect))?,
+                    None => {
+                        // Round-robin empty geometries through regular 
partitions to avoid
+                        // overloading a single slot when the build side is 
mostly empty.
+                        let p = SpatialPartition::Regular(cnt);
+                        cnt = (cnt + 1) % num_regular_partitions;
+                        p
+                    }
+                };
+                assignments.push(partition);
+            }
+        }
+        PartitionedSide::ProbeSide => {
+            for rect_opt in batch.rects() {
+                let partition = match rect_opt {
+                    Some(rect) => 
partitioner.partition(&geo_rect_to_bbox(rect))?,
+                    None => SpatialPartition::None,
+                };
+                assignments.push(partition);
+            }
+        }
+    }
+
+    Ok(())
+}
+
+/// Build a new [`EvaluatedBatch`] by interleaving rows from the provided
+/// `record_batches`/`geom_arrays` inputs according to `assignments`. Each pair
+/// in `assignments` identifies the source batch index and row index that 
should
+/// appear in the output in order, ensuring the geometry metadata stays aligned
+/// with the Arrow row data.
+pub(crate) fn interleave_evaluated_batch(
+    record_batches: &[&RecordBatch],
+    geom_arrays: &[&EvaluatedGeometryArray],
+    indices: &[(usize, usize)],
+) -> Result<EvaluatedBatch> {
+    if record_batches.is_empty() || geom_arrays.is_empty() {
+        return sedona_internal_err!("interleave_evaluated_batch requires at 
least one batch");
+    }
+    let batch = interleave_record_batch(record_batches, indices)?;
+    let geom_array = interleave_geometry_array(geom_arrays, indices)?;
+    Ok(EvaluatedBatch { batch, geom_array })
+}
+
+fn interleave_geometry_array(
+    geom_arrays: &[&EvaluatedGeometryArray],
+    indices: &[(usize, usize)],
+) -> Result<EvaluatedGeometryArray> {
+    if geom_arrays.is_empty() {
+        return sedona_internal_err!("interleave_geometry_array requires at 
least one batch");
+    }
+    let sedona_type = &geom_arrays[0].sedona_type;
+    let value_refs: Vec<&dyn Array> = geom_arrays
+        .iter()
+        .map(|geom| geom.geometry_array.as_ref())
+        .collect();
+    let geometry_array = arrow_interleave(&value_refs, indices)?;
+
+    let distance = interleave_distance_columns(geom_arrays, indices)?;
+
+    let mut result = EvaluatedGeometryArray::try_new(geometry_array, 
sedona_type)?;
+    result.distance = distance;
+    Ok(result)
+}
+
+fn interleave_distance_columns(
+    geom_arrays: &[&EvaluatedGeometryArray],
+    assignments: &[(usize, usize)],
+) -> Result<Option<ColumnarValue>> {
+    // Check consistency and determine if we need array conversion
+    let mut first_value: Option<&ColumnarValue> = None;
+    let mut needs_array = false;
+    let mut all_null = true;
+    let mut first_scalar: Option<&ScalarValue> = None;
+
+    for geom in geom_arrays {
+        match &geom.distance {
+            Some(value) => {
+                if first_value.is_none() {
+                    first_value = Some(value);
+                }
+
+                match value {
+                    ColumnarValue::Array(array) => {
+                        needs_array = true;
+                        if all_null && array.logical_null_count() != 
array.len() {
+                            all_null = false;
+                        }
+                    }
+                    ColumnarValue::Scalar(scalar) => {
+                        if let Some(first) = first_scalar {
+                            if first != scalar {
+                                needs_array = true;
+                            }
+                        } else {
+                            first_scalar = Some(scalar);
+                        }
+                        if !scalar.is_null() {
+                            all_null = false;
+                        }
+                    }
+                }
+            }
+            None => {
+                if first_value.is_some() && !all_null {
+                    return sedona_internal_err!("Inconsistent distance 
metadata across batches");
+                }
+            }
+        }
+    }
+
+    if all_null {
+        return Ok(None);
+    }
+
+    let Some(distance_value) = first_value else {
+        return Ok(None);
+    };
+
+    // If all scalars match, return scalar
+    if !needs_array {
+        if let ColumnarValue::Scalar(value) = distance_value {
+            return Ok(Some(ColumnarValue::Scalar(value.clone())));
+        }
+    }
+
+    // Convert to arrays and interleave
+    let mut arrays: Vec<ArrayRef> = Vec::with_capacity(geom_arrays.len());
+    for geom in geom_arrays {
+        match &geom.distance {
+            Some(ColumnarValue::Array(array)) => arrays.push(array.clone()),
+            Some(ColumnarValue::Scalar(value)) => {
+                
arrays.push(value.to_array_of_size(geom.geometry_array.len())?);
+            }
+            None => {
+                return sedona_internal_err!("Inconsistent distance metadata 
across batches");
+            }
+        }
+    }
+
+    let array_refs: Vec<&dyn Array> = arrays.iter().map(|array| 
array.as_ref()).collect();
+    let array = arrow_interleave(&array_refs, assignments)?;
+    Ok(Some(ColumnarValue::Array(array)))
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow_array::{ArrayRef, BinaryArray, Int32Array};
+    use arrow_schema::{DataType, Field, Schema};
+    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+    use sedona_geometry::bounding_box::BoundingBox;
+    use sedona_geometry::interval::IntervalTrait;
+    use sedona_schema::datatypes::WKB_GEOMETRY;
+
+    use crate::{
+        evaluated_batch::{
+            evaluated_batch_stream::in_mem::InMemoryEvaluatedBatchStream,
+            spill::EvaluatedBatchSpillReader,
+        },
+        partitioning::flat::FlatPartitioner,
+    };
+
+    const BUFFER_BYTES: usize = 8 * 1024 * 1024;
+    const TARGET_BATCH_SIZE: usize = 4096;
+
+    fn sample_schema() -> Arc<Schema> {
+        Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]))
+    }
+
+    fn sample_batch(ids: &[i32], wkbs: Vec<Option<Vec<u8>>>) -> 
Result<EvaluatedBatch> {
+        assert_eq!(ids.len(), wkbs.len());
+        let id_array = Arc::new(Int32Array::from(ids.to_vec())) as ArrayRef;
+        let batch = RecordBatch::try_new(sample_schema(), vec![id_array])?;
+        let geom_values: Vec<Option<&[u8]>> = wkbs
+            .iter()
+            .map(|wkb_opt| wkb_opt.as_ref().map(|wkb| wkb.as_slice()))
+            .collect();
+        let geom_array: ArrayRef = Arc::new(BinaryArray::from(geom_values));
+        let geom = EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY)?;
+        Ok(EvaluatedBatch {
+            batch,
+            geom_array: geom,
+        })
+    }
+
+    fn point_wkb(x: f64, y: f64) -> Vec<u8> {
+        let mut buf = vec![1u8, 1, 0, 0, 0];
+        buf.extend_from_slice(&x.to_le_bytes());
+        buf.extend_from_slice(&y.to_le_bytes());
+        buf
+    }
+
+    fn rect_wkb(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> Vec<u8> {
+        assert!(min_x <= max_x, "min_x must be <= max_x");
+        assert!(min_y <= max_y, "min_y must be <= max_y");
+        let mut buf = Vec::with_capacity(1 + 4 + 4 + 4 + 5 * 16);
+        buf.push(1u8); // little endian
+        buf.extend_from_slice(&3u32.to_le_bytes()); // polygon type
+        buf.extend_from_slice(&1u32.to_le_bytes()); // single ring
+        buf.extend_from_slice(&5u32.to_le_bytes()); // five coordinates 
(closed ring)
+        let coords = [
+            (min_x, min_y),
+            (max_x, min_y),
+            (max_x, max_y),
+            (min_x, max_y),
+            (min_x, min_y),
+        ];
+        for (x, y) in coords {
+            buf.extend_from_slice(&x.to_le_bytes());
+            buf.extend_from_slice(&y.to_le_bytes());
+        }
+        buf
+    }
+
+    fn read_ids(file: &RefCountedTempFile) -> Result<Vec<i32>> {
+        let mut reader = EvaluatedBatchSpillReader::try_new(file)?;
+        let mut ids = Vec::new();
+        while let Some(batch) = reader.next_batch() {
+            let batch = batch?;
+            let array = batch
+                .batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap();
+            for i in 0..array.len() {
+                ids.push(array.value(i));
+            }
+        }
+        Ok(ids)
+    }
+
+    fn read_batch_row_counts(file: &RefCountedTempFile) -> Result<Vec<usize>> {
+        let mut reader = EvaluatedBatchSpillReader::try_new(file)?;
+        let mut counts = Vec::new();
+        while let Some(batch) = reader.next_batch() {
+            let batch = batch?;
+            counts.push(batch.batch.num_rows());
+        }
+        Ok(counts)
+    }
+
+    fn bbox_limits(bbox: &BoundingBox) -> (f64, f64, f64, f64) {
+        (bbox.x().lo(), bbox.x().hi(), bbox.y().lo(), bbox.y().hi())
+    }
+
+    #[tokio::test]
+    async fn repartition_basic() -> Result<()> {
+        let wkbs = vec![
+            Some(point_wkb(10.0, 10.0)),
+            Some(point_wkb(60.0, 10.0)),
+            Some(point_wkb(150.0, 10.0)),
+        ];
+        let batch = sample_batch(&[0, 1, 2], wkbs)?;
+        let schema = batch.schema();
+        let stream: SendableEvaluatedBatchStream =
+            Box::pin(InMemoryEvaluatedBatchStream::new(schema, vec![batch]));
+
+        let partitions = vec![
+            BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
+            BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
+        ];
+        let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+        let runtime_env = Arc::new(RuntimeEnv::default());
+        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+
+        let result = StreamRepartitioner::builder(
+            runtime_env,
+            partitioner,
+            PartitionedSide::ProbeSide,
+            metrics,
+        )
+        .spill_compression(SpillCompression::Uncompressed)
+        .buffer_bytes_threshold(BUFFER_BYTES)
+        .target_batch_size(TARGET_BATCH_SIZE)
+        .spilled_batch_in_memory_size_threshold(None)
+        .build()
+        .repartition_stream(stream)
+        .await?;
+
+        assert_eq!(result.spill_file_count(), 3);
+        assert_eq!(
+            read_ids(
+                &result
+                    .spilled_partition(SpatialPartition::Regular(0))?
+                    .spill_files()[0]
+            )?,
+            vec![0]
+        );
+        assert_eq!(
+            read_ids(
+                &result
+                    .spilled_partition(SpatialPartition::Regular(1))?
+                    .spill_files()[0]
+            )?,
+            vec![1]
+        );
+        assert_eq!(
+            read_ids(
+                &result
+                    .spilled_partition(SpatialPartition::None)?
+                    .spill_files()[0]
+            )?,
+            vec![2]
+        );
+
+        assert_eq!(
+            bbox_limits(
+                result
+                    .spilled_partition(SpatialPartition::Regular(0))?
+                    .bounding_box()
+                    .unwrap()
+            ),
+            (10.0, 10.0, 10.0, 10.0)
+        );
+        assert_eq!(
+            bbox_limits(
+                result
+                    .spilled_partition(SpatialPartition::Regular(1))?
+                    .bounding_box()
+                    .unwrap()
+            ),
+            (60.0, 60.0, 10.0, 10.0)
+        );
+        assert_eq!(
+            bbox_limits(
+                result
+                    .spilled_partition(SpatialPartition::None)?
+                    .bounding_box()
+                    .unwrap()
+            ),
+            (150.0, 150.0, 10.0, 10.0)
+        );
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn repartition_multi_and_none() -> Result<()> {
+        let wkbs = vec![Some(rect_wkb(25.0, 0.0, 75.0, 20.0)), None];
+        let batch = sample_batch(&[0, 1], wkbs)?;
+        let schema = batch.schema();
+        let stream: SendableEvaluatedBatchStream =
+            Box::pin(InMemoryEvaluatedBatchStream::new(schema, vec![batch]));
+
+        let partitions = vec![
+            BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
+            BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
+        ];
+        let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+        let runtime_env = Arc::new(RuntimeEnv::default());
+        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+
+        let result = StreamRepartitioner::builder(
+            runtime_env,
+            partitioner,
+            PartitionedSide::ProbeSide,
+            metrics,
+        )
+        .spill_compression(SpillCompression::Uncompressed)
+        .buffer_bytes_threshold(BUFFER_BYTES)
+        .target_batch_size(TARGET_BATCH_SIZE)
+        .spilled_batch_in_memory_size_threshold(None)
+        .build()
+        .repartition_stream(stream)
+        .await?;
+
+        assert_eq!(result.spill_file_count(), 2);
+        assert_eq!(
+            read_ids(
+                &result
+                    .spilled_partition(SpatialPartition::Multi)?
+                    .spill_files()[0]
+            )?,
+            vec![0]
+        );
+        assert_eq!(
+            read_ids(
+                &result
+                    .spilled_partition(SpatialPartition::None)?
+                    .spill_files()[0]
+            )?,
+            vec![1]
+        );
+        assert_eq!(
+            bbox_limits(
+                result
+                    .spilled_partition(SpatialPartition::Multi)?
+                    .bounding_box()
+                    .unwrap()
+            ),
+            (25.0, 75.0, 0.0, 20.0)
+        );
+        let none_bound = result
+            .spilled_partition(SpatialPartition::None)?
+            .bounding_box()
+            .expect("Geo stats should exist for None partition");
+        assert!(none_bound.x().is_empty());
+        assert!(none_bound.y().is_empty());
+        Ok(())
+    }
+
+    #[test]
+    fn streaming_repartitioner_finishes_partitions() -> Result<()> {
+        let wkbs = vec![Some(point_wkb(10.0, 10.0)), Some(point_wkb(60.0, 
10.0))];
+        let batch = sample_batch(&[0, 1], wkbs)?;
+        let partitions = vec![
+            BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
+            BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
+        ];
+        let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+        let runtime_env = Arc::new(RuntimeEnv::default());
+        let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 
0);
+        let mut repartitioner = StreamRepartitioner::builder(
+            runtime_env,
+            partitioner,
+            PartitionedSide::ProbeSide,
+            spill_metrics,
+        )
+        .spill_compression(SpillCompression::Uncompressed)
+        .buffer_bytes_threshold(0)
+        .target_batch_size(TARGET_BATCH_SIZE)
+        .spilled_batch_in_memory_size_threshold(None)
+        .build();
+
+        repartitioner.repartition_batch(batch)?;
+        let result = repartitioner.finish()?;
+        assert!(result
+            .spilled_partition(SpatialPartition::None)?
+            .spill_files()
+            .is_empty());
+        assert_eq!(
+            read_ids(
+                &result
+                    .spilled_partition(SpatialPartition::Regular(0))?
+                    .spill_files()[0]
+            )?,
+            vec![0]
+        );
+        assert_eq!(
+            read_ids(
+                &result
+                    .spilled_partition(SpatialPartition::Regular(1))?
+                    .spill_files()[0]
+            )?,
+            vec![1]
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn streaming_repartitioner_buffers_until_threshold() -> Result<()> {
+        let batch_a = sample_batch(&[0], vec![Some(point_wkb(10.0, 10.0))])?;
+        let batch_b = sample_batch(&[1], vec![Some(point_wkb(20.0, 10.0))])?;
+        let partitions = vec![BoundingBox::xy((0.0, 50.0), (0.0, 50.0))];
+        let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+        let runtime_env = Arc::new(RuntimeEnv::default());
+        let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 
0);
+        let mut repartitioner = StreamRepartitioner::builder(
+            runtime_env,
+            partitioner,
+            PartitionedSide::ProbeSide,
+            spill_metrics,
+        )
+        .spill_compression(SpillCompression::Uncompressed)
+        .buffer_bytes_threshold(usize::MAX)
+        .target_batch_size(TARGET_BATCH_SIZE)
+        .spilled_batch_in_memory_size_threshold(None)
+        .build();
+
+        repartitioner.repartition_batch(batch_a)?;
+        repartitioner.repartition_batch(batch_b)?;
+        let result = repartitioner.finish()?;
+        assert_eq!(
+            read_ids(
+                &result
+                    .spilled_partition(SpatialPartition::Regular(0))?
+                    .spill_files()[0]
+            )?,
+            vec![0, 1]
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn streaming_repartitioner_respects_target_batch_size() -> Result<()> {
+        let batch_a = sample_batch(&[0], vec![Some(point_wkb(10.0, 10.0))])?;
+        let batch_b = sample_batch(&[1], vec![Some(point_wkb(20.0, 10.0))])?;
+        let partitions = vec![BoundingBox::xy((0.0, 50.0), (0.0, 50.0))];
+        let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+        let runtime_env = Arc::new(RuntimeEnv::default());
+        let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 
0);
+        let mut repartitioner = StreamRepartitioner::builder(
+            runtime_env,
+            partitioner,
+            PartitionedSide::ProbeSide,
+            spill_metrics,
+        )
+        .spill_compression(SpillCompression::Uncompressed)
+        .buffer_bytes_threshold(usize::MAX)
+        .target_batch_size(1)
+        .spilled_batch_in_memory_size_threshold(None)
+        .build();
+
+        repartitioner.repartition_batch(batch_a)?;
+        repartitioner.repartition_batch(batch_b)?;
+        let result = repartitioner.finish()?;
+        let counts = read_batch_row_counts(
+            &result
+                .spilled_partition(SpatialPartition::Regular(0))?
+                .spill_files()[0],
+        )?;
+        assert_eq!(counts, vec![1, 1]);
+        Ok(())
+    }
+
+    fn make_geom_array_with_distance(
+        wkbs: Vec<Vec<u8>>,
+        distance: Option<ColumnarValue>,
+    ) -> Result<EvaluatedGeometryArray> {
+        let geom_array: ArrayRef = Arc::new(BinaryArray::from(
+            wkbs.iter()
+                .map(|wkb| Some(wkb.as_slice()))
+                .collect::<Vec<_>>(),
+        ));
+        let mut geom = EvaluatedGeometryArray::try_new(geom_array, 
&WKB_GEOMETRY)?;
+        geom.distance = distance;
+        Ok(geom)
+    }
+
+    #[test]
+    fn interleave_distance_none() -> Result<()> {
+        let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+        let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+        let geom1 = make_geom_array_with_distance(wkbs1, None)?;
+        let geom2 = make_geom_array_with_distance(wkbs2, None)?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+        assert!(result.is_none());
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_uniform_scalar() -> Result<()> {
+        let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+        let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+        let scalar = ScalarValue::Float64(Some(5.0));
+        let geom1 =
+            make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar.clone())))?;
+        let geom2 =
+            make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(scalar.clone())))?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+        assert!(matches!(result, Some(ColumnarValue::Scalar(_))));
+        if let Some(ColumnarValue::Scalar(value)) = result {
+            assert_eq!(value, ScalarValue::Float64(Some(5.0)));
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_different_scalars() -> Result<()> {
+        use arrow_array::Float64Array;
+
+        let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+        let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+        let scalar1 = ScalarValue::Float64(Some(5.0));
+        let scalar2 = ScalarValue::Float64(Some(10.0));
+        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar1)))?;
+        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(scalar2)))?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+        assert!(matches!(result, Some(ColumnarValue::Array(_))));
+        if let Some(ColumnarValue::Array(array)) = result {
+            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
+            assert_eq!(float_array.len(), 3);
+            assert_eq!(float_array.value(0), 5.0);
+            assert_eq!(float_array.value(1), 10.0);
+            assert_eq!(float_array.value(2), 5.0);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_arrays() -> Result<()> {
+        use arrow_array::Float64Array;
+
+        let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+        let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+        let array1: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0]));
+        let array2: ArrayRef = Arc::new(Float64Array::from(vec![3.0]));
+        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Array(array1)))?;
+        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Array(array2)))?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+        assert!(matches!(result, Some(ColumnarValue::Array(_))));
+        if let Some(ColumnarValue::Array(array)) = result {
+            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
+            assert_eq!(float_array.len(), 3);
+            assert_eq!(float_array.value(0), 1.0);
+            assert_eq!(float_array.value(1), 3.0);
+            assert_eq!(float_array.value(2), 2.0);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_mixed_scalar_and_array() -> Result<()> {
+        use arrow_array::Float64Array;
+
+        let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+        let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+        let scalar = ScalarValue::Float64(Some(5.0));
+        let array: ArrayRef = Arc::new(Float64Array::from(vec![10.0]));
+        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar)))?;
+        let geom2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Array(array)))?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+        assert!(matches!(result, Some(ColumnarValue::Array(_))));
+        if let Some(ColumnarValue::Array(array)) = result {
+            let float_array = 
array.as_any().downcast_ref::<Float64Array>().unwrap();
+            assert_eq!(float_array.len(), 3);
+            assert_eq!(float_array.value(0), 5.0);
+            assert_eq!(float_array.value(1), 10.0);
+            assert_eq!(float_array.value(2), 5.0);
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_evaluated_batch_empty_assignments() -> Result<()> {
+        let batch_a = sample_batch(&[0], vec![Some(point_wkb(10.0, 10.0))])?;
+        let batch_b = sample_batch(&[1], vec![Some(point_wkb(20.0, 20.0))])?;
+        let record_batches = vec![&batch_a.batch, &batch_b.batch];
+        let geom_arrays = vec![&batch_a.geom_array, &batch_b.geom_array];
+
+        let result = interleave_evaluated_batch(&record_batches, &geom_arrays, 
&[])?;
+        assert_eq!(result.batch.num_rows(), 0);
+        assert_eq!(result.geom_array.geometry_array.len(), 0);
+        assert!(result.geom_array.rects.is_empty());
+        assert!(result.geom_array.distance.is_none());
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_inconsistent_metadata() -> Result<()> {
+        let wkbs1 = vec![point_wkb(10.0, 10.0)];
+        let wkbs2 = vec![point_wkb(20.0, 20.0)];
+
+        let scalar = ScalarValue::Float64(Some(5.0));
+        let geom1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Scalar(scalar)))?;
+        let geom2 = make_geom_array_with_distance(wkbs2, None)?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0)];
+
+        let result = interleave_distance_columns(&geom_arrays, &assignments);
+        assert!(result.is_err());
+        if let Err(e) = result {
+            assert!(e.to_string().contains("Inconsistent distance metadata"));
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_binary_view_array() -> Result<()> {
+        use arrow_array::BinaryViewArray;
+        use sedona_schema::crs::Crs;
+        use sedona_schema::datatypes::{Edges, SedonaType};
+        let wkb_view_geometry = SedonaType::WkbView(Edges::Planar, Crs::None);
+
+        let wkbs1 = [point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+        let wkbs2 = [point_wkb(30.0, 30.0)];
+
+        // Create BinaryViewArray
+        let array1 = BinaryViewArray::from_iter(wkbs1.iter().map(|w| 
Some(w.as_slice())));
+        let array2 = BinaryViewArray::from_iter(wkbs2.iter().map(|w| 
Some(w.as_slice())));
+
+        let geom1 = EvaluatedGeometryArray::try_new(Arc::new(array1), 
&wkb_view_geometry)?;
+        let geom2 = EvaluatedGeometryArray::try_new(Arc::new(array2), 
&wkb_view_geometry)?;
+
+        let geom_arrays = vec![&geom1, &geom2];
+        let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+        // Create dummy record batches
+        let batch1 = RecordBatch::try_new(
+            sample_schema(),
+            vec![Arc::new(Int32Array::from(vec![1, 2]))],
+        )?;
+        let batch2 =
+            RecordBatch::try_new(sample_schema(), 
vec![Arc::new(Int32Array::from(vec![3]))])?;
+        let record_batches = vec![&batch1, &batch2];
+
+        let result = interleave_evaluated_batch(&record_batches, &geom_arrays, 
&assignments)?;
+
+        // Check if the result geometry array is BinaryViewArray
+        let geom_array = result.geom_array.geometry_array;
+        assert!(geom_array
+            .as_any()
+            .downcast_ref::<BinaryViewArray>()
+            .is_some());
+
+        // Check values
+        let view_array = geom_array
+            .as_any()
+            .downcast_ref::<BinaryViewArray>()
+            .unwrap();
+        assert_eq!(view_array.len(), 3);
+        assert_eq!(view_array.value(0), wkbs1[0].as_slice());
+        assert_eq!(view_array.value(1), wkbs2[0].as_slice());
+        assert_eq!(view_array.value(2), wkbs1[1].as_slice());
+
+        Ok(())
+    }
+
+    #[test]
+    fn interleave_distance_mixed_none_and_null() -> Result<()> {
+        use arrow_array::Float64Array;
+
+        let wkbs1 = vec![point_wkb(10.0, 10.0)];
+        let wkbs2 = vec![point_wkb(20.0, 20.0)];
+        let wkbs3 = vec![point_wkb(30.0, 30.0)];
+
+        let null_array = Arc::new(Float64Array::new_null(1));
+        let ega1 = make_geom_array_with_distance(wkbs1, 
Some(ColumnarValue::Array(null_array)))?;
+
+        let null_scalar = ScalarValue::Float64(None);
+        let ega2 = make_geom_array_with_distance(wkbs2, 
Some(ColumnarValue::Scalar(null_scalar)))?;
+
+        let ega3 = make_geom_array_with_distance(wkbs3, None)?;
+
+        let vec_ega = vec![&ega1, &ega2, &ega3];
+        let assignments = vec![(0, 0), (1, 0), (2, 0)];
+
+        let result = interleave_distance_columns(&vec_ega, &assignments)?;
+        assert!(result.is_none());
+        Ok(())
+    }
+}
diff --git a/rust/sedona-spatial-join/src/partitioning/util.rs 
b/rust/sedona-spatial-join/src/partitioning/util.rs
index a5f57304..20a93801 100644
--- a/rust/sedona-spatial-join/src/partitioning/util.rs
+++ b/rust/sedona-spatial-join/src/partitioning/util.rs
@@ -63,6 +63,13 @@ pub(crate) fn bbox_to_geo_rect(bbox: &BoundingBox) -> 
Result<Option<Rect<f32>>>
     }
 }
 
+/// Convert a [`Rect<f32>`] into a [`BoundingBox`].
+pub(crate) fn geo_rect_to_bbox(rect: &Rect<f32>) -> BoundingBox {
+    let min = rect.min();
+    let max = rect.max();
+    BoundingBox::xy((min.x as f64, max.x as f64), (min.y as f64, max.y as f64))
+}
+
 /// Creates a `Rect` from four coordinate values representing the bounding box.
 ///
 /// This is a convenience function that constructs a `geo::Rect` from 
individual
diff --git a/rust/sedona-spatial-join/src/stream.rs 
b/rust/sedona-spatial-join/src/stream.rs
index 6cf175c2..8451ff2d 100644
--- a/rust/sedona-spatial-join/src/stream.rs
+++ b/rust/sedona-spatial-join/src/stream.rs
@@ -413,7 +413,7 @@ impl SpatialJoinStream {
             let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY, 
WKB_GEOMETRY);
             let geom_array = &probe_evaluated_batch.geom_array;
             for wkb in geom_array.wkbs().iter().flatten() {
-                analyzer.update_statistics(wkb, wkb.buf().len())?;
+                analyzer.update_statistics(wkb)?;
             }
             let stats = analyzer.finish();
             spatial_index.merge_probe_stats(stats);
diff --git a/rust/sedona-spatial-join/src/utils/spill.rs 
b/rust/sedona-spatial-join/src/utils/spill.rs
index fef585a5..116d5043 100644
--- a/rust/sedona-spatial-join/src/utils/spill.rs
+++ b/rust/sedona-spatial-join/src/utils/spill.rs
@@ -39,7 +39,7 @@ pub(crate) struct RecordBatchSpillWriter {
     in_progress_file: RefCountedTempFile,
     writer: StreamWriter<File>,
     metrics: SpillMetrics,
-    batch_size_threshold: Option<usize>,
+    batch_in_memory_size_threshold: Option<usize>,
     gc_view_arrays: bool,
 }
 
@@ -50,7 +50,7 @@ impl RecordBatchSpillWriter {
         request_description: &str,
         compression: SpillCompression,
         metrics: SpillMetrics,
-        batch_size_threshold: Option<usize>,
+        batch_in_memory_size_threshold: Option<usize>,
     ) -> Result<Self> {
         let in_progress_file = 
env.disk_manager.create_tmp_file(request_description)?;
         let file = File::create(in_progress_file.path())?;
@@ -67,7 +67,7 @@ impl RecordBatchSpillWriter {
             in_progress_file,
             writer,
             metrics,
-            batch_size_threshold,
+            batch_in_memory_size_threshold,
             gc_view_arrays,
         })
     }
@@ -101,7 +101,7 @@ impl RecordBatchSpillWriter {
     }
 
     fn calculate_rows_per_split(&self, batch: &RecordBatch, num_rows: usize) 
-> Result<usize> {
-        let Some(threshold) = self.batch_size_threshold else {
+        let Some(threshold) = self.batch_in_memory_size_threshold else {
             return Ok(num_rows);
         };
         if threshold == 0 {

Reply via email to