This is an automated email from the ASF dual-hosted git repository.

kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new e31d5540 chore(rust/sedona-spatial-join): Integrate bounding box 
sampler and spilling support into build side collector (#542)
e31d5540 is described below

commit e31d55402c1bd8a820469c381d954f2e7c1c9afe
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Tue Jan 27 10:29:59 2026 +0800

    chore(rust/sedona-spatial-join): Integrate bounding box sampler and 
spilling support into build side collector (#542)
    
    This is an intermediate step that integrates multiple pieces together. The 
overall behavior is unchanged when memory limit of DataFusion is not set (which 
is the default case). The collected bounding box samples are unused for now, 
the performance overhead of sampling boxes is negligible according to our 
tests. Committing this won't drive the project into an unstable or unreleasable 
state.
    
    The next step will be adding a partitioned spatial index provider and 
integrate spatial partitioner into the main spatial join workflow, but will 
effectively only work on one single partition for now. This will also be a 
non-breaking change.
---
 Cargo.lock                                         |   1 +
 Cargo.toml                                         |   1 +
 rust/sedona-common/src/option.rs                   |  97 +++++
 rust/sedona-geometry/src/bounding_box.rs           |  15 +
 rust/sedona-spatial-join/Cargo.toml                |   1 +
 rust/sedona-spatial-join/src/build_index.rs        |  28 +-
 rust/sedona-spatial-join/src/exec.rs               |  22 +-
 .../src/index/build_side_collector.rs              | 431 +++++++++++++++++++--
 rust/sedona-spatial-join/src/optimizer.rs          |  20 +-
 9 files changed, 571 insertions(+), 45 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 22f5e45a..bc6799a9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5404,6 +5404,7 @@ dependencies = [
  "geo-traits",
  "geo-types",
  "geos",
+ "log",
  "once_cell",
  "parking_lot",
  "pin-project-lite",
diff --git a/Cargo.toml b/Cargo.toml
index 04dede88..85ff5f43 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -90,6 +90,7 @@ datafusion-physical-expr = { version = "51.0.0" }
 datafusion-physical-plan = { version = "51.0.0" }
 dirs = "6.0.0"
 env_logger = "0.11"
+log = "^0.4"
 fastrand = "2.0"
 futures = "0.3"
 pin-project-lite = "0.2"
diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs
index bc74acf7..e5290688 100644
--- a/rust/sedona-common/src/option.rs
+++ b/rust/sedona-common/src/option.rs
@@ -71,12 +71,94 @@ config_namespace! {
         /// Include tie-breakers in KNN join results when there are tied 
distances
         pub knn_include_tie_breakers: bool, default = false
 
+        /// Maximum number of sample bounding boxes collected from the index 
side for partitioning the
+        /// data when running out-of-core spatial join
+        pub max_index_side_bbox_samples: usize, default = 10000
+
+        /// Minimum number of sample bounding boxes collected from the index 
side for partitioning the
+        /// data when running out-of-core spatial join
+        pub min_index_side_bbox_samples: usize, default = 1000
+
+        /// Target sampling rate for sampling bounding boxes from the index 
side for partitioning the
+        /// data when running out-of-core spatial join
+        pub target_index_side_bbox_sampling_rate: f64, default = 0.01
+
+        /// The in memory size threshold of batches written to spill files. If 
the spilled batch is
+        /// too large, it will be broken into several smaller parts before 
written to spill files.
+        /// This is for avoiding overshooting the memory limit when reading 
spilled batches from
+        /// spill files. Specify 0 for unlimited size.
+        pub spilled_batch_in_memory_size_threshold: usize, default = 0
+
         /// The minimum number of geometry pairs per chunk required to enable 
parallel
         /// refinement during the spatial join operation. When the refinement 
phase has
         /// fewer geometry pairs than this threshold, it will run sequentially 
instead
         /// of spawning parallel tasks. Higher values reduce parallelization 
overhead
         /// for small datasets, while lower values enable more fine-grained 
parallelism.
         pub parallel_refinement_chunk_size: usize, default = 8192
+
+        /// Options for debugging or testing spatial join
+        pub debug : SpatialJoinDebugOptions, default = 
SpatialJoinDebugOptions::default()
+    }
+}
+
+config_namespace! {
+    /// Configurations for debugging or testing spatial join
+    pub struct SpatialJoinDebugOptions {
+        /// Number of spatial partitions to use for spatial join
+        pub num_spatial_partitions: NumSpatialPartitionsConfig, default = 
NumSpatialPartitionsConfig::Auto
+
+        /// The amount of memory for intermittent usage such as spatially 
repartitioning the data
+        pub memory_for_intermittent_usage: Option<usize>, default = None
+
+        /// Force spilling while collecting the build side or not
+        pub force_spill: bool, default = false
+
+        /// Seed for random processes in the spatial join for testing purpose
+        pub random_seed: Option<u64>, default = None
+    }
+}
+
+#[derive(Debug, PartialEq, Clone, Copy)]
+pub enum NumSpatialPartitionsConfig {
+    /// Automatically determine the number of spatial partitions
+    Auto,
+
+    /// Use a fixed number of spatial partitions
+    Fixed(usize),
+}
+
+impl ConfigField for NumSpatialPartitionsConfig {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) 
{
+        let value = match self {
+            NumSpatialPartitionsConfig::Auto => "auto".into(),
+            NumSpatialPartitionsConfig::Fixed(n) => format!("{n}"),
+        };
+        v.some(key, value, description);
+    }
+
+    fn set(&mut self, _key: &str, value: &str) -> Result<()> {
+        let value = value.to_lowercase();
+        let config = match value.as_str() {
+            "auto" => NumSpatialPartitionsConfig::Auto,
+            _ => match value.parse::<usize>() {
+                Ok(n) => {
+                    if n > 0 {
+                        NumSpatialPartitionsConfig::Fixed(n)
+                    } else {
+                        return 
Err(datafusion_common::DataFusionError::Configuration(
+                            "num_spatial_partitions must be greater than 
0".to_string(),
+                        ));
+                    }
+                }
+                Err(_) => {
+                    return 
Err(datafusion_common::DataFusionError::Configuration(format!(
+                        "Unknown num_spatial_partitions config: {value}. 
Expected formats: auto, <number>"
+                    )));
+                }
+            },
+        };
+        *self = config;
+        Ok(())
     }
 }
 
@@ -421,4 +503,19 @@ mod tests {
         assert!(index_type.set("", "invalid").is_err());
         assert!(index_type.set("", "").is_err());
     }
+
+    #[test]
+    fn test_num_spatial_partitions_config_parsing() {
+        let mut config = NumSpatialPartitionsConfig::Auto;
+
+        assert!(config.set("", "auto").is_ok());
+        assert_eq!(config, NumSpatialPartitionsConfig::Auto);
+
+        assert!(config.set("", "10").is_ok());
+        assert_eq!(config, NumSpatialPartitionsConfig::Fixed(10));
+
+        assert!(config.set("", "0").is_err());
+        assert!(config.set("", "invalid").is_err());
+        assert!(config.set("", "fixed[10]").is_err());
+    }
 }
diff --git a/rust/sedona-geometry/src/bounding_box.rs 
b/rust/sedona-geometry/src/bounding_box.rs
index 7a018fb5..2da191fd 100644
--- a/rust/sedona-geometry/src/bounding_box.rs
+++ b/rust/sedona-geometry/src/bounding_box.rs
@@ -69,6 +69,16 @@ impl BoundingBox {
         }
     }
 
+    /// Create an empty BoundingBox
+    pub fn empty() -> Self {
+        Self {
+            x: WraparoundInterval::empty(),
+            y: Interval::empty(),
+            z: None,
+            m: None,
+        }
+    }
+
     /// The x interval
     pub fn x(&self) -> &WraparoundInterval {
         &self.x
@@ -91,6 +101,11 @@ impl BoundingBox {
         &self.m
     }
 
+    /// Check whether this BoundingBox is empty
+    pub fn is_empty(&self) -> bool {
+        self.x.is_empty() || self.y.is_empty()
+    }
+
     /// Calculate intersection with another BoundingBox
     ///
     /// Returns true if this bounding box may intersect other or false 
otherwise. This
diff --git a/rust/sedona-spatial-join/Cargo.toml 
b/rust/sedona-spatial-join/Cargo.toml
index 010c2ead..322ec572 100644
--- a/rust/sedona-spatial-join/Cargo.toml
+++ b/rust/sedona-spatial-join/Cargo.toml
@@ -66,6 +66,7 @@ geo-index = { workspace = true }
 geos = { workspace = true }
 float_next_after = { workspace = true }
 fastrand = { workspace = true }
+log = { workspace = true }
 
 [dev-dependencies]
 criterion = { workspace = true }
diff --git a/rust/sedona-spatial-join/src/build_index.rs 
b/rust/sedona-spatial-join/src/build_index.rs
index 5a171007..f3cbb34b 100644
--- a/rust/sedona-spatial-join/src/build_index.rs
+++ b/rust/sedona-spatial-join/src/build_index.rs
@@ -46,7 +46,12 @@ pub async fn build_index(
     join_type: JoinType,
     probe_threads_count: usize,
     metrics: ExecutionPlanMetricsSet,
+    seed: u64,
 ) -> Result<SpatialIndex> {
+    log::debug!(
+        "Building spatial index for running spatial join, seed = {}",
+        seed
+    );
     let session_config = context.session_config();
     let sedona_options = session_config
         .options()
@@ -55,10 +60,14 @@ pub async fn build_index(
         .cloned()
         .unwrap_or_default();
     let concurrent = 
sedona_options.spatial_join.concurrent_build_side_collection;
+    let runtime_env = context.runtime_env();
+    let spill_compression = session_config.spill_compression();
     let memory_pool = context.memory_pool();
     let collector = BuildSideBatchesCollector::new(
         spatial_predicate.clone(),
         sedona_options.spatial_join.clone(),
+        Arc::clone(&runtime_env),
+        spill_compression,
     );
     let num_partitions = build_streams.len();
     let mut collect_metrics_vec = Vec::with_capacity(num_partitions);
@@ -72,12 +81,23 @@ pub async fn build_index(
     }
 
     let build_partitions = collector
-        .collect_all(build_streams, reservations, collect_metrics_vec, 
concurrent)
+        .collect_all(
+            build_streams,
+            reservations,
+            collect_metrics_vec,
+            concurrent,
+            seed,
+        )
         .await?;
 
-    let contains_external_stream = build_partitions
-        .iter()
-        .any(|partition| partition.build_side_batch_stream.is_external());
+    let contains_external_stream = build_partitions.iter().any(|partition| {
+        // Access fields to avoid unused variable warnings. Will be removed 
when out-of-core
+        // spatial join (https://github.com/apache/sedona-db/issues/436) is 
fully implemented.
+        let _ = partition.num_rows;
+        let _ = partition.bbox_samples;
+        let _ = partition.estimated_spatial_index_memory_usage;
+        partition.build_side_batch_stream.is_external()
+    });
     if !contains_external_stream {
         let mut index_builder = SpatialIndexBuilder::new(
             build_schema,
diff --git a/rust/sedona-spatial-join/src/exec.rs 
b/rust/sedona-spatial-join/src/exec.rs
index 43b73290..50cbd171 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -33,6 +33,7 @@ use datafusion_physical_plan::{
     PlanProperties,
 };
 use parking_lot::Mutex;
+use sedona_common::SpatialJoinOptions;
 
 use crate::{
     build_index::build_index,
@@ -137,6 +138,8 @@ pub struct SpatialJoinExec {
     /// Indicates if this SpatialJoin was converted from a HashJoin
     /// When true, we preserve HashJoin's equivalence properties and 
partitioning
     converted_from_hash_join: bool,
+    /// A random seed for making random procedures in spatial join 
deterministic
+    seed: u64,
 }
 
 impl SpatialJoinExec {
@@ -148,11 +151,15 @@ impl SpatialJoinExec {
         filter: Option<JoinFilter>,
         join_type: &JoinType,
         projection: Option<Vec<usize>>,
+        options: &SpatialJoinOptions,
     ) -> Result<Self> {
-        Self::try_new_with_options(left, right, on, filter, join_type, 
projection, false)
+        Self::try_new_with_options(
+            left, right, on, filter, join_type, projection, options, false,
+        )
     }
 
     /// Create a new SpatialJoinExec with additional options
+    #[allow(clippy::too_many_arguments)]
     pub fn try_new_with_options(
         left: Arc<dyn ExecutionPlan>,
         right: Arc<dyn ExecutionPlan>,
@@ -160,6 +167,7 @@ impl SpatialJoinExec {
         filter: Option<JoinFilter>,
         join_type: &JoinType,
         projection: Option<Vec<usize>>,
+        options: &SpatialJoinOptions,
         converted_from_hash_join: bool,
     ) -> Result<Self> {
         let left_schema = left.schema();
@@ -179,6 +187,11 @@ impl SpatialJoinExec {
             converted_from_hash_join,
         )?;
 
+        let seed = options
+            .debug
+            .random_seed
+            .unwrap_or(fastrand::u64(0..0xFFFF));
+
         Ok(SpatialJoinExec {
             left,
             right,
@@ -192,6 +205,7 @@ impl SpatialJoinExec {
             cache,
             once_async_spatial_index: Arc::new(Mutex::new(None)),
             converted_from_hash_join,
+            seed,
         })
     }
 
@@ -419,6 +433,7 @@ impl ExecutionPlan for SpatialJoinExec {
             cache: self.cache.clone(),
             once_async_spatial_index: Arc::new(Mutex::new(None)),
             converted_from_hash_join: self.converted_from_hash_join,
+            seed: self.seed,
         }))
     }
 
@@ -472,6 +487,7 @@ impl ExecutionPlan for SpatialJoinExec {
                                 self.join_type,
                                 probe_thread_count,
                                 self.metrics.clone(),
+                                self.seed,
                             ))
                         })?
                 };
@@ -563,6 +579,7 @@ impl SpatialJoinExec {
                         self.join_type,
                         probe_thread_count,
                         self.metrics.clone(),
+                        self.seed,
                     ))
                 })?
         };
@@ -1330,7 +1347,7 @@ mod tests {
         let sql = "SELECT * FROM L LEFT JOIN R ON ST_Intersects(L.geometry, 
R.geometry)";
 
         // Create SpatialJoinExec plan
-        let ctx = setup_context(Some(options), batch_size)?;
+        let ctx = setup_context(Some(options.clone()), batch_size)?;
         ctx.register_table("L", mem_table_left.clone())?;
         ctx.register_table("R", mem_table_right.clone())?;
         let df = ctx.sql(sql).await?;
@@ -1345,6 +1362,7 @@ mod tests {
             original_exec.filter.clone(),
             &join_type,
             None,
+            &options,
         )?;
 
         // Create NestedLoopJoinExec plan for comparison
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs 
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index 89537592..646c6be2 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -17,12 +17,17 @@
 
 use std::sync::Arc;
 
-use datafusion_common::Result;
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result};
 use datafusion_common_runtime::JoinSet;
-use datafusion_execution::{memory_pool::MemoryReservation, 
SendableRecordBatchStream};
-use datafusion_physical_plan::metrics::{self, ExecutionPlanMetricsSet, 
MetricBuilder};
+use datafusion_execution::{
+    memory_pool::MemoryReservation, runtime_env::RuntimeEnv, 
SendableRecordBatchStream,
+};
+use datafusion_physical_plan::metrics::{
+    self, ExecutionPlanMetricsSet, MetricBuilder, SpillMetrics,
+};
 use futures::StreamExt;
-use sedona_common::SpatialJoinOptions;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
 use sedona_expr::statistics::GeoStatistics;
 use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
 use sedona_schema::datatypes::WKB_GEOMETRY;
@@ -30,27 +35,38 @@ use sedona_schema::datatypes::WKB_GEOMETRY;
 use crate::{
     evaluated_batch::{
         evaluated_batch_stream::{
-            evaluate::create_evaluated_build_stream, 
in_mem::InMemoryEvaluatedBatchStream,
-            SendableEvaluatedBatchStream,
+            evaluate::create_evaluated_build_stream, 
external::ExternalEvaluatedBatchStream,
+            in_mem::InMemoryEvaluatedBatchStream, SendableEvaluatedBatchStream,
         },
+        spill::EvaluatedBatchSpillWriter,
         EvaluatedBatch,
     },
     index::SpatialIndexBuilder,
     operand_evaluator::{create_operand_evaluator, OperandEvaluator},
-    SpatialPredicate,
+    spatial_predicate::SpatialPredicate,
+    utils::bbox_sampler::{BoundingBoxSampler, BoundingBoxSamples},
 };
 
-/// Safety buffer applied when pre-growing build-side reservations to leave 
headroom for
-/// auxiliary structures beyond the build batches themselves.
-/// 20% was chosen as a conservative margin.
-const BUILD_SIDE_RESERVATION_BUFFER_RATIO: f64 = 0.20;
-
 pub(crate) struct BuildPartition {
+    pub num_rows: usize,
     pub build_side_batch_stream: SendableEvaluatedBatchStream,
     pub geo_statistics: GeoStatistics,
 
-    /// Memory reservation for tracking the memory usage of the build partition
-    /// Cleared on `BuildPartition` drop
+    /// Subset of build-side bounding boxes kept for building partitioners 
(e.g. KDB partitioner)
+    /// when the indexed data cannot be fully loaded into memory.
+    pub bbox_samples: BoundingBoxSamples,
+
+    /// The estimated memory usage of building spatial index from all the data
+    /// collected in this partition. The estimated memory used by the global
+    /// spatial index will be the sum of these per-partition estimation.
+    pub estimated_spatial_index_memory_usage: usize,
+
+    /// Memory reservation for tracking the maximum memory usage when 
collecting
+    /// the build side. This reservation won't be freed even when spilling is
+    /// triggered. We deliberately only grow the memory reservation to probe
+    /// the amount of memory available for loading spatial index into memory.
+    /// The size of this reservation will be used to determine the maximum 
size of
+    /// each spatial partition, as well as how many spatial partitions to 
create.
     pub reservation: MemoryReservation,
 }
 
@@ -63,8 +79,11 @@ pub(crate) struct BuildSideBatchesCollector {
     spatial_predicate: SpatialPredicate,
     spatial_join_options: SpatialJoinOptions,
     evaluator: Arc<dyn OperandEvaluator>,
+    runtime_env: Arc<RuntimeEnv>,
+    spill_compression: SpillCompression,
 }
 
+#[derive(Clone)]
 pub(crate) struct CollectBuildSideMetrics {
     /// Number of batches collected
     num_batches: metrics::Count,
@@ -77,6 +96,8 @@ pub(crate) struct CollectBuildSideMetrics {
     /// Total time taken to collect and process the build side batches. This 
does not include the time awaiting
     /// for batches from the input stream.
     time_taken: metrics::Time,
+    /// Spill metrics of build partitions collecting phase
+    spill_metrics: SpillMetrics,
 }
 
 impl CollectBuildSideMetrics {
@@ -88,6 +109,7 @@ impl CollectBuildSideMetrics {
                 .gauge("build_input_total_size_bytes", partition),
             time_taken: MetricBuilder::new(metrics)
                 .subset_time("build_input_collection_time", partition),
+            spill_metrics: SpillMetrics::new(metrics, partition),
         }
     }
 }
@@ -96,41 +118,95 @@ impl BuildSideBatchesCollector {
     pub fn new(
         spatial_predicate: SpatialPredicate,
         spatial_join_options: SpatialJoinOptions,
+        runtime_env: Arc<RuntimeEnv>,
+        spill_compression: SpillCompression,
     ) -> Self {
         let evaluator = create_operand_evaluator(&spatial_predicate, 
spatial_join_options.clone());
         BuildSideBatchesCollector {
             spatial_predicate,
             spatial_join_options,
             evaluator,
+            runtime_env,
+            spill_compression,
         }
     }
 
+    /// Collect build-side batches from the stream into a `BuildPartition`.
+    ///
+    /// This method grows the given memory reservation as if an in-memory 
spatial
+    /// index will be built for all collected batches. If the reservation 
cannot
+    /// be grown, batches are spilled to disk and the reservation is left at 
its
+    /// peak value.
+    ///
+    /// The reservation represents memory available for loading the spatial 
index.
+    /// Across all partitions, the sum of their reservations forms a soft 
memory
+    /// cap for subsequent spatial join operations. Reservations grown here are
+    /// not released until the spatial join operator completes.
     pub async fn collect(
         &self,
         mut stream: SendableEvaluatedBatchStream,
         mut reservation: MemoryReservation,
+        mut bbox_sampler: BoundingBoxSampler,
         metrics: &CollectBuildSideMetrics,
     ) -> Result<BuildPartition> {
+        let mut spill_writer_opt = None;
         let mut in_mem_batches: Vec<EvaluatedBatch> = Vec::new();
+        let mut total_num_rows = 0;
+        let mut total_size_bytes = 0;
         let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY, WKB_GEOMETRY);
 
+        // Reserve memory for holding bbox samples. This should be a small 
reservation.
+        // We simply return error if the reservation cannot be fulfilled, 
since there's
+        // too little memory for the collector and proceeding will risk 
overshooting the
+        // memory limit.
+        reservation.try_grow(bbox_sampler.estimate_maximum_memory_usage())?;
+
         while let Some(evaluated_batch) = stream.next().await {
             let build_side_batch = evaluated_batch?;
             let _timer = metrics.time_taken.timer();
 
-            // Process the record batch and create a BuildSideBatch
             let geom_array = &build_side_batch.geom_array;
             for wkb in geom_array.wkbs().iter().flatten() {
-                analyzer.update_statistics(wkb)?;
+                let summary = sedona_geometry::analyze::analyze_geometry(wkb)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                if !summary.bbox.is_empty() {
+                    bbox_sampler.add_bbox(&summary.bbox);
+                }
+                analyzer.ingest_geometry_summary(&summary);
             }
 
+            let num_rows = build_side_batch.num_rows();
             let in_mem_size = build_side_batch.in_mem_size()?;
+            total_num_rows += num_rows;
+            total_size_bytes += in_mem_size;
+
             metrics.num_batches.add(1);
-            metrics.num_rows.add(build_side_batch.num_rows());
+            metrics.num_rows.add(num_rows);
             metrics.total_size_bytes.add(in_mem_size);
 
-            reservation.try_grow(in_mem_size)?;
-            in_mem_batches.push(build_side_batch);
+            match &mut spill_writer_opt {
+                None => {
+                    // Collected batches are in memory, no spilling happened 
for this partition before. We'll try
+                    // storing this batch in memory first, and switch to 
writing everything to disk if we fail
+                    // to grow the reservation.
+                    in_mem_batches.push(build_side_batch);
+                    if let Err(e) = reservation.try_grow(in_mem_size) {
+                        log::debug!(
+                            "Failed to grow reservation by {} bytes. Current 
reservation: {} bytes. \
+                            num rows: {}, reason: {:?}, Spilling...",
+                            in_mem_size,
+                            reservation.size(),
+                            num_rows,
+                            e,
+                        );
+                        spill_writer_opt =
+                            self.spill_in_mem_batches(&mut in_mem_batches, 
metrics)?;
+                    }
+                }
+                Some(spill_writer) => {
+                    spill_writer.append(&build_side_batch)?;
+                }
+            }
         }
 
         let geo_statistics = analyzer.finish();
@@ -140,19 +216,55 @@ impl BuildSideBatchesCollector {
             &self.spatial_join_options,
         );
 
-        // Try to grow the reservation with a safety buffer to leave room for 
additional data structures
-        let buffer_bytes = ((extra_mem + reservation.size()) as f64
-            * BUILD_SIDE_RESERVATION_BUFFER_RATIO)
-            .ceil() as usize;
-        let additional_reservation = extra_mem + buffer_bytes;
-        reservation.try_grow(additional_reservation)?;
+        // Try to grow the reservation a bit more to account for any 
underestimation of
+        // memory usage. We proceed even when the growth fails.
+        let additional_reservation = extra_mem + (extra_mem + 
reservation.size()) / 5;
+        if let Err(e) = reservation.try_grow(additional_reservation) {
+            log::debug!(
+                "Failed to grow reservation by {} bytes to account for spatial 
index building memory usage. \
+                Current reservation: {} bytes. reason: {:?}",
+                additional_reservation,
+                reservation.size(),
+                e,
+            );
+        }
+
+        // If force spill is enabled, flush everything to disk regardless of 
whether the memory
+        // is enough or not.
+        if self.spatial_join_options.debug.force_spill && 
spill_writer_opt.is_none() {
+            log::debug!(
+                "Force spilling enabled. Spilling {} in-memory batches to 
disk.",
+                in_mem_batches.len()
+            );
+            spill_writer_opt = self.spill_in_mem_batches(&mut in_mem_batches, 
metrics)?;
+        }
+
+        let build_side_batch_stream: SendableEvaluatedBatchStream = match 
spill_writer_opt {
+            Some(spill_writer) => {
+                let spill_file = spill_writer.finish()?;
+                if !in_mem_batches.is_empty() {
+                    return sedona_internal_err!(
+                        "In-memory batches should have been spilled when spill 
file exists"
+                    );
+                }
+                Box::pin(ExternalEvaluatedBatchStream::try_from_spill_file(
+                    Arc::new(spill_file),
+                )?)
+            }
+            None => {
+                let schema = stream.schema();
+                Box::pin(InMemoryEvaluatedBatchStream::new(schema, 
in_mem_batches))
+            }
+        };
+
+        let estimated_spatial_index_memory_usage = total_size_bytes + 
extra_mem;
 
         Ok(BuildPartition {
-            build_side_batch_stream: 
Box::pin(InMemoryEvaluatedBatchStream::new(
-                stream.schema(),
-                in_mem_batches,
-            )),
+            num_rows: total_num_rows,
+            build_side_batch_stream,
             geo_statistics,
+            bbox_samples: bbox_sampler.into_samples(),
+            estimated_spatial_index_memory_usage,
             reservation,
         })
     }
@@ -163,16 +275,28 @@ impl BuildSideBatchesCollector {
         reservations: Vec<MemoryReservation>,
         metrics_vec: Vec<CollectBuildSideMetrics>,
         concurrent: bool,
+        seed: u64,
     ) -> Result<Vec<BuildPartition>> {
         if streams.is_empty() {
             return Ok(vec![]);
         }
 
+        assert_eq!(
+            streams.len(),
+            reservations.len(),
+            "each build stream must have a reservation"
+        );
+        assert_eq!(
+            streams.len(),
+            metrics_vec.len(),
+            "each build stream must have a metrics collector"
+        );
+
         if concurrent {
-            self.collect_all_concurrently(streams, reservations, metrics_vec)
+            self.collect_all_concurrently(streams, reservations, metrics_vec, 
seed)
                 .await
         } else {
-            self.collect_all_sequentially(streams, reservations, metrics_vec)
+            self.collect_all_sequentially(streams, reservations, metrics_vec, 
seed)
                 .await
         }
     }
@@ -182,8 +306,9 @@ impl BuildSideBatchesCollector {
         streams: Vec<SendableRecordBatchStream>,
         reservations: Vec<MemoryReservation>,
         metrics_vec: Vec<CollectBuildSideMetrics>,
+        seed: u64,
     ) -> Result<Vec<BuildPartition>> {
-        // Spawn a task for each stream to scan all streams concurrently
+        // Spawn task for each stream to scan all streams concurrently
         let mut join_set = JoinSet::new();
         for (partition_id, ((stream, metrics), reservation)) in streams
             .into_iter()
@@ -193,11 +318,18 @@ impl BuildSideBatchesCollector {
         {
             let collector = self.clone();
             let evaluator = Arc::clone(&self.evaluator);
+            let bbox_sampler = BoundingBoxSampler::try_new(
+                self.spatial_join_options.min_index_side_bbox_samples,
+                self.spatial_join_options.max_index_side_bbox_samples,
+                self.spatial_join_options
+                    .target_index_side_bbox_sampling_rate,
+                seed.wrapping_add(partition_id as u64),
+            )?;
             join_set.spawn(async move {
                 let evaluated_stream =
                     create_evaluated_build_stream(stream, evaluator, 
metrics.time_taken.clone());
                 let result = collector
-                    .collect(evaluated_stream, reservation, &metrics)
+                    .collect(evaluated_stream, reservation, bbox_sampler, 
&metrics)
                     .await;
                 (partition_id, result)
             });
@@ -224,20 +356,247 @@ impl BuildSideBatchesCollector {
         streams: Vec<SendableRecordBatchStream>,
         reservations: Vec<MemoryReservation>,
         metrics_vec: Vec<CollectBuildSideMetrics>,
+        seed: u64,
     ) -> Result<Vec<BuildPartition>> {
         // Collect partitions sequentially (for JNI/embedded contexts)
         let mut results = Vec::with_capacity(streams.len());
-        for ((stream, metrics), reservation) in
-            streams.into_iter().zip(metrics_vec).zip(reservations)
+        for (partition_id, ((stream, metrics), reservation)) in streams
+            .into_iter()
+            .zip(metrics_vec)
+            .zip(reservations)
+            .enumerate()
         {
             let evaluator = Arc::clone(&self.evaluator);
+            let bbox_sampler = BoundingBoxSampler::try_new(
+                self.spatial_join_options.min_index_side_bbox_samples,
+                self.spatial_join_options.max_index_side_bbox_samples,
+                self.spatial_join_options
+                    .target_index_side_bbox_sampling_rate,
+                seed.wrapping_add(partition_id as u64),
+            )?;
+
             let evaluated_stream =
                 create_evaluated_build_stream(stream, evaluator, 
metrics.time_taken.clone());
             let result = self
-                .collect(evaluated_stream, reservation, &metrics)
+                .collect(evaluated_stream, reservation, bbox_sampler, &metrics)
                 .await?;
             results.push(result);
         }
         Ok(results)
     }
+
+    fn spill_in_mem_batches(
+        &self,
+        in_mem_batches: &mut Vec<EvaluatedBatch>,
+        metrics: &CollectBuildSideMetrics,
+    ) -> Result<Option<EvaluatedBatchSpillWriter>> {
+        if in_mem_batches.is_empty() {
+            return Ok(None);
+        }
+
+        let build_side_batch = &in_mem_batches[0];
+
+        let schema = build_side_batch.schema();
+        let sedona_type = &build_side_batch.geom_array.sedona_type;
+        let mut spill_writer = EvaluatedBatchSpillWriter::try_new(
+            Arc::clone(&self.runtime_env),
+            schema,
+            sedona_type,
+            "spilling build side batches",
+            self.spill_compression,
+            metrics.spill_metrics.clone(),
+            if self
+                .spatial_join_options
+                .spilled_batch_in_memory_size_threshold
+                == 0
+            {
+                None
+            } else {
+                Some(
+                    self.spatial_join_options
+                        .spilled_batch_in_memory_size_threshold,
+                )
+            },
+        )?;
+
+        for in_mem_batch in in_mem_batches.iter() {
+            spill_writer.append(in_mem_batch)?;
+        }
+
+        in_mem_batches.clear();
+        Ok(Some(spill_writer))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{
+        operand_evaluator::EvaluatedGeometryArray,
+        spatial_predicate::{RelationPredicate, SpatialRelationType},
+    };
+    use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch};
+    use arrow_schema::{DataType, Field, Schema};
+    use datafusion_common::ScalarValue;
+    use datafusion_execution::memory_pool::{GreedyMemoryPool, MemoryConsumer, 
MemoryPool};
+    use datafusion_physical_expr::expressions::Literal;
+    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+    use futures::TryStreamExt;
+    use sedona_common::SpatialJoinOptions;
+    use sedona_schema::datatypes::WKB_GEOMETRY;
+
+    fn test_schema() -> Arc<Schema> {
+        Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]))
+    }
+
+    fn sample_batch(ids: &[i32], wkbs: Vec<Option<Vec<u8>>>) -> 
Result<EvaluatedBatch> {
+        assert_eq!(ids.len(), wkbs.len());
+        let id_array = Arc::new(Int32Array::from(ids.to_vec())) as ArrayRef;
+        let batch = RecordBatch::try_new(test_schema(), vec![id_array])?;
+        let geom_values: Vec<Option<&[u8]>> = wkbs
+            .iter()
+            .map(|wkb_opt| wkb_opt.as_ref().map(|wkb| wkb.as_slice()))
+            .collect();
+        let geom_array: ArrayRef = Arc::new(BinaryArray::from(geom_values));
+        let geom = EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY)?;
+        Ok(EvaluatedBatch {
+            batch,
+            geom_array: geom,
+        })
+    }
+
+    fn point_wkb(x: f64, y: f64) -> Vec<u8> {
+        let mut buf = vec![1u8, 1, 0, 0, 0];
+        buf.extend_from_slice(&x.to_le_bytes());
+        buf.extend_from_slice(&y.to_le_bytes());
+        buf
+    }
+
+    fn build_collector() -> BuildSideBatchesCollector {
+        let expr: Arc<dyn datafusion_physical_expr::PhysicalExpr> =
+            Arc::new(Literal::new(ScalarValue::Null));
+        let predicate = SpatialPredicate::Relation(RelationPredicate::new(
+            Arc::clone(&expr),
+            expr,
+            SpatialRelationType::Intersects,
+        ));
+        BuildSideBatchesCollector::new(
+            predicate,
+            SpatialJoinOptions::default(),
+            Arc::new(RuntimeEnv::default()),
+            SpillCompression::Uncompressed,
+        )
+    }
+
+    fn memory_reservation(limit: usize) -> (MemoryReservation, Arc<dyn 
MemoryPool>) {
+        let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(limit));
+        let consumer = 
MemoryConsumer::new("build-side-test").with_can_spill(true);
+        let reservation = consumer.register(&pool);
+        (reservation, pool)
+    }
+
+    fn build_stream(batches: Vec<EvaluatedBatch>) -> 
SendableEvaluatedBatchStream {
+        let schema = batches
+            .first()
+            .map(|batch| batch.schema())
+            .unwrap_or_else(test_schema);
+        Box::pin(InMemoryEvaluatedBatchStream::new(schema, batches))
+    }
+
+    fn collect_ids(batches: &[EvaluatedBatch]) -> Vec<i32> {
+        let mut ids = Vec::new();
+        for batch in batches {
+            let array = batch
+                .batch
+                .column(0)
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap();
+            for i in 0..array.len() {
+                ids.push(array.value(i));
+            }
+        }
+        ids
+    }
+
+    #[tokio::test]
+    async fn collect_keeps_batches_in_memory_when_capacity_suffices() -> 
Result<()> {
+        let collector = build_collector();
+        let (reservation, _pool) = memory_reservation(10 * 1024 * 1024);
+        let sampler = BoundingBoxSampler::try_new(1, 4, 1.0, 7)?;
+        let batch_a = sample_batch(
+            &[0, 1],
+            vec![Some(point_wkb(0.0, 0.0)), Some(point_wkb(1.0, 1.0))],
+        )?;
+        let batch_b = sample_batch(&[2], vec![Some(point_wkb(2.0, 2.0))])?;
+        let stream = build_stream(vec![batch_a, batch_b]);
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = CollectBuildSideMetrics::new(0, &metrics_set);
+
+        let partition = collector
+            .collect(stream, reservation, sampler, &metrics)
+            .await?;
+        let stream = partition.build_side_batch_stream;
+        let is_external = stream.is_external();
+        let batches: Vec<EvaluatedBatch> = stream.try_collect().await?;
+        assert!(!is_external, "Expected in-memory batches");
+        assert_eq!(collect_ids(&batches), vec![0, 1, 2]);
+        assert_eq!(partition.num_rows, 3);
+        assert_eq!(metrics.num_batches.value(), 2);
+        assert_eq!(metrics.num_rows.value(), 3);
+        assert!(metrics.total_size_bytes.value() > 0);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn collect_spills_when_reservation_cannot_grow() -> Result<()> {
+        let collector = build_collector();
+        let sampler = BoundingBoxSampler::try_new(1, 2, 1.0, 13)?;
+        let bbox_mem = sampler.estimate_maximum_memory_usage();
+        let (reservation, _pool) = memory_reservation(bbox_mem + 1);
+        let batch_a = sample_batch(
+            &[10, 11],
+            vec![Some(point_wkb(5.0, 5.0)), Some(point_wkb(6.0, 6.0))],
+        )?;
+        let batch_b = sample_batch(&[12], vec![Some(point_wkb(7.0, 7.0))])?;
+        let stream = build_stream(vec![batch_a, batch_b]);
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = CollectBuildSideMetrics::new(0, &metrics_set);
+
+        let partition = collector
+            .collect(stream, reservation, sampler, &metrics)
+            .await?;
+        let stream = partition.build_side_batch_stream;
+        let is_external = stream.is_external();
+        let batches: Vec<EvaluatedBatch> = stream.try_collect().await?;
+        assert!(is_external, "Expected batches to spill to disk");
+        assert_eq!(collect_ids(&batches), vec![10, 11, 12]);
+        let spill_metrics = metrics.spill_metrics;
+        assert!(spill_metrics.spill_file_count.value() >= 1);
+        assert!(spill_metrics.spilled_rows.value() >= 1);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn collect_handles_empty_stream() -> Result<()> {
+        let collector = build_collector();
+        let (reservation, _pool) = memory_reservation(1024);
+        let sampler = BoundingBoxSampler::try_new(1, 2, 1.0, 19)?;
+        let stream = build_stream(Vec::new());
+        let metrics_set = ExecutionPlanMetricsSet::new();
+        let metrics = CollectBuildSideMetrics::new(0, &metrics_set);
+
+        let partition = collector
+            .collect(stream, reservation, sampler, &metrics)
+            .await?;
+        assert_eq!(partition.num_rows, 0);
+        let stream = partition.build_side_batch_stream;
+        let is_external = stream.is_external();
+        let batches: Vec<EvaluatedBatch> = stream.try_collect().await?;
+        assert!(!is_external);
+        assert!(batches.is_empty());
+        assert_eq!(metrics.num_batches.value(), 0);
+        assert_eq!(metrics.num_rows.value(), 0);
+        Ok(())
+    }
 }
diff --git a/rust/sedona-spatial-join/src/optimizer.rs 
b/rust/sedona-spatial-join/src/optimizer.rs
index bd01821b..a8c28167 100644
--- a/rust/sedona-spatial-join/src/optimizer.rs
+++ b/rust/sedona-spatial-join/src/optimizer.rs
@@ -235,18 +235,20 @@ impl SpatialJoinOptimizer {
     fn try_optimize_join(
         &self,
         plan: Arc<dyn ExecutionPlan>,
-        _config: &ConfigOptions,
+        config: &ConfigOptions,
     ) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
         // Check if this is a NestedLoopJoinExec that we can convert to 
spatial join
         if let Some(nested_loop_join) = 
plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
-            if let Some(spatial_join) = 
self.try_convert_to_spatial_join(nested_loop_join)? {
+            if let Some(spatial_join) =
+                self.try_convert_to_spatial_join(nested_loop_join, config)?
+            {
                 return Ok(Transformed::yes(spatial_join));
             }
         }
 
         // Check if this is a HashJoinExec with spatial filter that we can 
convert to spatial join
         if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
-            if let Some(spatial_join) = 
self.try_convert_hash_join_to_spatial(hash_join)? {
+            if let Some(spatial_join) = 
self.try_convert_hash_join_to_spatial(hash_join, config)? {
                 return Ok(Transformed::yes(spatial_join));
             }
         }
@@ -261,7 +263,12 @@ impl SpatialJoinOptimizer {
     fn try_convert_to_spatial_join(
         &self,
         nested_loop_join: &NestedLoopJoinExec,
+        config: &ConfigOptions,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let Some(options) = config.extensions.get::<SedonaOptions>() else {
+            return Ok(None);
+        };
+
         if let Some(join_filter) = nested_loop_join.filter() {
             if let Some((spatial_predicate, remainder)) = 
transform_join_filter(join_filter) {
                 // The left side of the nested loop join is required to have 
only one partition, while SpatialJoinExec
@@ -300,6 +307,7 @@ impl SpatialJoinOptimizer {
                     remainder,
                     join_type,
                     nested_loop_join.projection().cloned(),
+                    &options.spatial_join,
                 )?;
 
                 return Ok(Some(Arc::new(spatial_join)));
@@ -316,7 +324,12 @@ impl SpatialJoinOptimizer {
     fn try_convert_hash_join_to_spatial(
         &self,
         hash_join: &HashJoinExec,
+        config: &ConfigOptions,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let Some(options) = config.extensions.get::<SedonaOptions>() else {
+            return Ok(None);
+        };
+
         // Check if the filter contains spatial predicates
         if let Some(join_filter) = hash_join.filter() {
             if let Some((spatial_predicate, mut remainder)) = 
transform_join_filter(join_filter) {
@@ -354,6 +367,7 @@ impl SpatialJoinOptimizer {
                     remainder,
                     hash_join.join_type(),
                     None, // No projection in SpatialJoinExec
+                    &options.spatial_join,
                     true, // converted_from_hash_join = true
                 )?);
 


Reply via email to