This is an automated email from the ASF dual-hosted git repository.

kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new a9ceb6a0 chore(rust/sedona-spatial-join): Revamp memory reservation 
pattern for spatial join (#534)
a9ceb6a0 is described below

commit a9ceb6a05131e25065a22e1c1316376a3ff3671b
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Fri Jan 23 14:54:54 2026 +0800

    chore(rust/sedona-spatial-join): Revamp memory reservation pattern for 
spatial join (#534)
    
    This change shifts spatial join memory planning earlier in the build-side 
collection flow, simplifying reservation handling while improving reliability 
around DataFusion’s reservation behavior. We only reserve memory when 
collecting the build side batches for building (probably partitioned) spatial 
indexes and don't reserve memory when probing the index and producing result 
batches.
    
    Note: reserving memory while producing batches can trigger DataFusion 
reservation failures (see 
[https://github.com/apache/datafusion/issues/17334](vscode-file://vscode-app/Applications/Visual%20Studio%20Code.app/Contents/Resources/app/out/vs/code/electron-browser/workbench/workbench.html)).
---
 rust/sedona-spatial-join/src/build_index.rs        |   8 +-
 .../src/index/build_side_collector.rs              |  40 ++++-
 rust/sedona-spatial-join/src/index/knn_adapter.rs  |  39 ++---
 .../sedona-spatial-join/src/index/spatial_index.rs |  17 +-
 .../src/index/spatial_index_builder.rs             | 101 +++++++-----
 rust/sedona-spatial-join/src/refine.rs             |  16 +-
 rust/sedona-spatial-join/src/refine/geo.rs         |   4 +
 rust/sedona-spatial-join/src/refine/geos.rs        |   7 +
 rust/sedona-spatial-join/src/refine/tg.rs          |   7 +
 rust/sedona-spatial-join/src/utils.rs              |   1 -
 .../src/utils/concurrent_reservation.rs            | 176 ---------------------
 11 files changed, 157 insertions(+), 259 deletions(-)

diff --git a/rust/sedona-spatial-join/src/build_index.rs 
b/rust/sedona-spatial-join/src/build_index.rs
index f369365c..5a171007 100644
--- a/rust/sedona-spatial-join/src/build_index.rs
+++ b/rust/sedona-spatial-join/src/build_index.rs
@@ -29,7 +29,6 @@ use crate::{
         BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialIndex, 
SpatialIndexBuilder,
         SpatialJoinBuildMetrics,
     },
-    operand_evaluator::create_operand_evaluator,
     spatial_predicate::SpatialPredicate,
 };
 
@@ -57,9 +56,10 @@ pub async fn build_index(
         .unwrap_or_default();
     let concurrent = 
sedona_options.spatial_join.concurrent_build_side_collection;
     let memory_pool = context.memory_pool();
-    let evaluator =
-        create_operand_evaluator(&spatial_predicate, 
sedona_options.spatial_join.clone());
-    let collector = BuildSideBatchesCollector::new(evaluator);
+    let collector = BuildSideBatchesCollector::new(
+        spatial_predicate.clone(),
+        sedona_options.spatial_join.clone(),
+    );
     let num_partitions = build_streams.len();
     let mut collect_metrics_vec = Vec::with_capacity(num_partitions);
     let mut reservations = Vec::with_capacity(num_partitions);
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs 
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index 90ebb05f..89537592 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -22,6 +22,7 @@ use datafusion_common_runtime::JoinSet;
 use datafusion_execution::{memory_pool::MemoryReservation, 
SendableRecordBatchStream};
 use datafusion_physical_plan::metrics::{self, ExecutionPlanMetricsSet, 
MetricBuilder};
 use futures::StreamExt;
+use sedona_common::SpatialJoinOptions;
 use sedona_expr::statistics::GeoStatistics;
 use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
 use sedona_schema::datatypes::WKB_GEOMETRY;
@@ -34,9 +35,16 @@ use crate::{
         },
         EvaluatedBatch,
     },
-    operand_evaluator::OperandEvaluator,
+    index::SpatialIndexBuilder,
+    operand_evaluator::{create_operand_evaluator, OperandEvaluator},
+    SpatialPredicate,
 };
 
+/// Safety buffer applied when pre-growing build-side reservations to leave 
headroom for
+/// auxiliary structures beyond the build batches themselves.
+/// 20% was chosen as a conservative margin.
+const BUILD_SIDE_RESERVATION_BUFFER_RATIO: f64 = 0.20;
+
 pub(crate) struct BuildPartition {
     pub build_side_batch_stream: SendableEvaluatedBatchStream,
     pub geo_statistics: GeoStatistics,
@@ -52,6 +60,8 @@ pub(crate) struct BuildPartition {
 /// spatial index, depending on the statistics collected by the collector.
 #[derive(Clone)]
 pub(crate) struct BuildSideBatchesCollector {
+    spatial_predicate: SpatialPredicate,
+    spatial_join_options: SpatialJoinOptions,
     evaluator: Arc<dyn OperandEvaluator>,
 }
 
@@ -83,8 +93,16 @@ impl CollectBuildSideMetrics {
 }
 
 impl BuildSideBatchesCollector {
-    pub fn new(evaluator: Arc<dyn OperandEvaluator>) -> Self {
-        BuildSideBatchesCollector { evaluator }
+    pub fn new(
+        spatial_predicate: SpatialPredicate,
+        spatial_join_options: SpatialJoinOptions,
+    ) -> Self {
+        let evaluator = create_operand_evaluator(&spatial_predicate, 
spatial_join_options.clone());
+        BuildSideBatchesCollector {
+            spatial_predicate,
+            spatial_join_options,
+            evaluator,
+        }
     }
 
     pub async fn collect(
@@ -115,12 +133,26 @@ impl BuildSideBatchesCollector {
             in_mem_batches.push(build_side_batch);
         }
 
+        let geo_statistics = analyzer.finish();
+        let extra_mem = SpatialIndexBuilder::estimate_extra_memory_usage(
+            &geo_statistics,
+            &self.spatial_predicate,
+            &self.spatial_join_options,
+        );
+
+        // Try to grow the reservation with a safety buffer to leave room for 
additional data structures
+        let buffer_bytes = ((extra_mem + reservation.size()) as f64
+            * BUILD_SIDE_RESERVATION_BUFFER_RATIO)
+            .ceil() as usize;
+        let additional_reservation = extra_mem + buffer_bytes;
+        reservation.try_grow(additional_reservation)?;
+
         Ok(BuildPartition {
             build_side_batch_stream: 
Box::pin(InMemoryEvaluatedBatchStream::new(
                 stream.schema(),
                 in_mem_batches,
             )),
-            geo_statistics: analyzer.finish(),
+            geo_statistics,
             reservation,
         })
     }
diff --git a/rust/sedona-spatial-join/src/index/knn_adapter.rs 
b/rust/sedona-spatial-join/src/index/knn_adapter.rs
index c7f96de9..ec67fe15 100644
--- a/rust/sedona-spatial-join/src/index/knn_adapter.rs
+++ b/rust/sedona-spatial-join/src/index/knn_adapter.rs
@@ -16,11 +16,10 @@
 // under the License.
 
 use once_cell::sync::OnceCell;
-use std::sync::Arc;
 
-use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, 
MemoryReservation};
 use geo_index::rtree::distance::{EuclideanDistance, GeometryAccessor, 
HaversineDistance};
 use geo_types::Geometry;
+use sedona_expr::statistics::GeoStatistics;
 use sedona_geo::to_geo::item_to_geometry;
 
 use crate::evaluated_batch::EvaluatedBatch;
@@ -32,45 +31,39 @@ pub(crate) struct KnnComponents {
     /// Pre-allocated vector for geometry cache - lock-free access
     /// Indexed by rtree data index for O(1) access
     geometry_cache: Vec<OnceCell<Geometry<f64>>>,
-    /// Memory reservation to track geometry cache memory usage
-    _reservation: MemoryReservation,
+    /// Estimated memory usage for decoded geometries
+    estimated_memory_usage: usize,
 }
 
 impl KnnComponents {
     pub fn new(
         cache_size: usize,
         indexed_batches: &[EvaluatedBatch],
-        memory_pool: Arc<dyn MemoryPool>,
     ) -> datafusion_common::Result<Self> {
-        // Create memory consumer and reservation for geometry cache
-        let consumer = MemoryConsumer::new("SpatialJoinKnnGeometryCache");
-        let mut reservation = consumer.register(&memory_pool);
-
-        // Estimate maximum possible memory usage based on WKB sizes
-        let estimated_memory = 
Self::estimate_max_memory_usage(indexed_batches);
-        reservation.try_grow(estimated_memory)?;
-
         // Pre-allocate OnceCell vector
         let geometry_cache = (0..cache_size).map(|_| 
OnceCell::new()).collect();
+        let mut total_wkb_size = 0;
+        for batch in indexed_batches {
+            for wkb in batch.geom_array.wkbs().iter().flatten() {
+                total_wkb_size += wkb.buf().len();
+            }
+        }
 
         Ok(Self {
             euclidean_metric: EuclideanDistance,
             haversine_metric: HaversineDistance::default(),
             geometry_cache,
-            _reservation: reservation,
+            estimated_memory_usage: total_wkb_size,
         })
     }
 
-    /// Estimate the maximum memory usage for decoded geometries based on WKB 
sizes
-    pub fn estimate_max_memory_usage(indexed_batches: &[EvaluatedBatch]) -> 
usize {
-        let mut total_wkb_size = 0;
+    /// Estimate the maximum memory usage for decoded geometries based on 
statistics
+    pub fn estimate_max_memory_usage(build_stats: &GeoStatistics) -> usize {
+        build_stats.total_size_bytes().unwrap_or(0) as usize
+    }
 
-        for batch in indexed_batches {
-            for wkb in batch.geom_array.wkbs().iter().flatten() {
-                total_wkb_size += wkb.buf().len();
-            }
-        }
-        total_wkb_size
+    pub fn estimated_memory_usage(&self) -> usize {
+        self.estimated_memory_usage
     }
 }
 
diff --git a/rust/sedona-spatial-join/src/index/spatial_index.rs 
b/rust/sedona-spatial-join/src/index/spatial_index.rs
index e5e69dd8..9364920a 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index.rs
@@ -27,7 +27,7 @@ use arrow_array::RecordBatch;
 use arrow_schema::SchemaRef;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_common_runtime::JoinSet;
-use datafusion_execution::memory_pool::{MemoryPool, MemoryReservation};
+use datafusion_execution::memory_pool::MemoryReservation;
 use float_next_after::NextAfter;
 use geo::BoundingRect;
 use geo_index::rtree::{
@@ -51,7 +51,6 @@ use crate::{
     operand_evaluator::{create_operand_evaluator, distance_value_at, 
OperandEvaluator},
     refine::{create_refiner, IndexQueryResultRefiner},
     spatial_predicate::SpatialPredicate,
-    utils::concurrent_reservation::ConcurrentReservation,
 };
 use arrow::array::BooleanBufferBuilder;
 use sedona_common::{option::SpatialJoinOptions, sedona_internal_err, 
ExecutionMode};
@@ -66,9 +65,6 @@ pub struct SpatialIndex {
     /// The refiner for refining the index query results.
     pub(crate) refiner: Arc<dyn IndexQueryResultRefiner>,
 
-    /// Memory reservation for tracking the memory usage of the refiner
-    pub(crate) refiner_reservation: ConcurrentReservation,
-
     /// R-tree index for the geometry batches. It takes MBRs as query windows 
and returns
     /// data indexes. These data indexes should be translated using 
`data_id_to_batch_pos` to get
     /// the original geometry batch index and row index, or translated using 
`prepared_geom_idx_vec`
@@ -112,8 +108,7 @@ impl SpatialIndex {
         schema: SchemaRef,
         options: SpatialJoinOptions,
         probe_threads_counter: AtomicUsize,
-        mut reservation: MemoryReservation,
-        memory_pool: Arc<dyn MemoryPool>,
+        reservation: MemoryReservation,
     ) -> Self {
         let evaluator = create_operand_evaluator(&spatial_predicate, 
options.clone());
         let refiner = create_refiner(
@@ -123,17 +118,14 @@ impl SpatialIndex {
             0,
             GeoStatistics::empty(),
         );
-        let refiner_reservation = reservation.split(0);
-        let refiner_reservation = ConcurrentReservation::try_new(0, 
refiner_reservation).unwrap();
         let rtree = RTreeBuilder::<f32>::new(0).finish::<HilbertSort>();
         let knn_components = matches!(spatial_predicate, 
SpatialPredicate::KNearestNeighbors(_))
-            .then(|| KnnComponents::new(0, &[], memory_pool.clone()).unwrap());
+            .then(|| KnnComponents::new(0, &[]).unwrap());
         Self {
             schema,
             options,
             evaluator,
             refiner,
-            refiner_reservation,
             rtree,
             data_id_to_batch_pos: Vec::new(),
             indexed_batches: Vec::new(),
@@ -632,9 +624,6 @@ impl SpatialIndex {
         let num_results = results.len();
         build_batch_positions.extend(results);
 
-        // Update refiner memory reservation
-        self.refiner_reservation.resize(self.refiner.mem_usage())?;
-
         Ok(QueryResultMetrics {
             count: num_results,
             candidate_count,
diff --git a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs 
b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
index 49e0d8c6..9d97b539 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
@@ -25,7 +25,7 @@ use datafusion_common::{utils::proxy::VecAllocExt, Result};
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, 
MemoryReservation};
 use datafusion_expr::JoinType;
 use futures::StreamExt;
-use geo_index::rtree::{sort::HilbertSort, RTree, RTreeBuilder};
+use geo_index::rtree::{sort::HilbertSort, RTree, RTreeBuilder, RTreeIndex};
 use parking_lot::Mutex;
 use std::sync::{atomic::AtomicUsize, Arc};
 
@@ -35,9 +35,7 @@ use crate::{
     operand_evaluator::create_operand_evaluator,
     refine::create_refiner,
     spatial_predicate::SpatialPredicate,
-    utils::{
-        concurrent_reservation::ConcurrentReservation, 
join_utils::need_produce_result_in_final,
-    },
+    utils::join_utils::need_produce_result_in_final,
 };
 
 // Type aliases for better readability
@@ -48,10 +46,6 @@ type RTreeBuildResult = (SpatialRTree, DataIdToBatchPos);
 /// Rough estimate for in-memory size of the rtree per rect in bytes
 const RTREE_MEMORY_ESTIMATE_PER_RECT: usize = 60;
 
-/// The prealloc size for the refiner reservation. This is used to reduce the 
frequency of growing
-/// the reservation when updating the refiner memory reservation.
-const REFINER_RESERVATION_PREALLOC_SIZE: usize = 10 * 1024 * 1024; // 10MB
-
 /// Builder for constructing a SpatialIndex from geometry batches.
 ///
 /// This builder handles:
@@ -75,8 +69,8 @@ pub struct SpatialIndexBuilder {
     /// Statistics for indexed geometries
     stats: GeoStatistics,
 
-    /// Memory pool for managing the memory usage of the spatial index
-    memory_pool: Arc<dyn MemoryPool>,
+    /// Memory used by the spatial index
+    memory_used: usize,
 }
 
 /// Metrics for the build phase of the spatial join.
@@ -121,10 +115,43 @@ impl SpatialIndexBuilder {
             indexed_batches: Vec::new(),
             reservation,
             stats: GeoStatistics::empty(),
-            memory_pool,
+            memory_used: 0,
         })
     }
 
+    /// Estimate the amount of memory required by the R-tree index and 
evaluating spatial predicates.
+    /// The estimated memory usage does not include the memory required for 
holding the build side
+    /// batches.
+    pub fn estimate_extra_memory_usage(
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize {
+        // Estimate the amount of memory needed by the refiner
+        let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+        let refiner = create_refiner(
+            options.spatial_library,
+            spatial_predicate,
+            options.clone(),
+            num_geoms,
+            geo_stats.clone(),
+        );
+        let refiner_mem_usage = refiner.estimate_max_memory_usage(geo_stats);
+
+        let knn_components_mem_usage =
+            if matches!(spatial_predicate, 
SpatialPredicate::KNearestNeighbors(_)) {
+                KnnComponents::estimate_max_memory_usage(geo_stats)
+            } else {
+                0
+            };
+
+        // Estimate the amount of memory needed for the R-tree
+        let rtree_mem_usage = num_geoms * RTREE_MEMORY_ESTIMATE_PER_RECT;
+
+        // The final estimation is the sum of all above
+        refiner_mem_usage + knn_components_mem_usage + rtree_mem_usage
+    }
+
     /// Add a geometry batch to be indexed.
     ///
     /// This method accumulates geometry batches that will be used to build 
the spatial index.
@@ -132,8 +159,7 @@ impl SpatialIndexBuilder {
     pub fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
         let in_mem_size = indexed_batch.in_mem_size()?;
         self.indexed_batches.push(indexed_batch);
-        self.reservation.grow(in_mem_size);
-        self.metrics.build_mem_used.add(in_mem_size);
+        self.record_memory_usage(in_mem_size);
         Ok(())
     }
 
@@ -154,10 +180,6 @@ impl SpatialIndexBuilder {
 
         let mut rtree_builder = RTreeBuilder::<f32>::new(num_rects as u32);
         let mut batch_pos_vec = vec![(-1, -1); num_rects];
-        let rtree_mem_estimate = num_rects * RTREE_MEMORY_ESTIMATE_PER_RECT;
-
-        self.reservation
-            .grow(batch_pos_vec.allocated_size() + rtree_mem_estimate);
 
         for (batch_idx, batch) in self.indexed_batches.iter().enumerate() {
             let rects = batch.rects();
@@ -175,7 +197,8 @@ impl SpatialIndexBuilder {
         let rtree = rtree_builder.finish::<HilbertSort>();
         build_timer.done();
 
-        self.metrics.build_mem_used.add(self.reservation.size());
+        let mem_usage = rtree.metadata().data_buffer_length() + 
batch_pos_vec.allocated_size();
+        self.record_memory_usage(mem_usage);
 
         Ok((rtree, batch_pos_vec))
     }
@@ -199,8 +222,7 @@ impl SpatialIndexBuilder {
             bitmaps.push(bitmap);
         }
 
-        self.reservation.try_grow(total_buffer_size)?;
-        self.metrics.build_mem_used.add(total_buffer_size);
+        self.record_memory_usage(total_buffer_size);
 
         Ok(Some(Mutex::new(bitmaps)))
     }
@@ -216,7 +238,8 @@ impl SpatialIndexBuilder {
         }
 
         let mut geom_idx_vec = Vec::with_capacity(batch_pos_vec.len());
-        self.reservation.grow(geom_idx_vec.allocated_size());
+        self.record_memory_usage(geom_idx_vec.allocated_size());
+
         for (batch_idx, row_idx) in batch_pos_vec {
             // Convert (batch_idx, row_idx) to a linear, sequential index
             let batch_offset = batch_idx_offset[*batch_idx as usize];
@@ -236,7 +259,6 @@ impl SpatialIndexBuilder {
                 self.options,
                 AtomicUsize::new(self.probe_threads_count),
                 self.reservation,
-                self.memory_pool.clone(),
             ));
         }
 
@@ -248,6 +270,7 @@ impl SpatialIndexBuilder {
             .sum::<usize>();
 
         let (rtree, batch_pos_vec) = self.build_rtree()?;
+
         let geom_idx_vec = self.build_geom_idx_vec(&batch_pos_vec);
         let visited_build_side = self.build_visited_bitmaps()?;
 
@@ -256,35 +279,36 @@ impl SpatialIndexBuilder {
             &self.spatial_predicate,
             self.options.clone(),
             num_geoms,
-            self.stats,
+            self.stats.clone(),
         );
-        let consumer = MemoryConsumer::new("SpatialJoinRefiner");
-        let refiner_reservation = consumer.register(&self.memory_pool);
-        let refiner_reservation =
-            ConcurrentReservation::try_new(REFINER_RESERVATION_PREALLOC_SIZE, 
refiner_reservation)
-                .unwrap();
+        
self.record_memory_usage(refiner.estimate_max_memory_usage(&self.stats));
 
         let cache_size = batch_pos_vec.len();
-        let knn_components = matches!(
-            self.spatial_predicate,
-            SpatialPredicate::KNearestNeighbors(_)
-        )
-        .then(|| KnnComponents::new(cache_size, &self.indexed_batches, 
self.memory_pool.clone()))
-        .transpose()?;
+        let knn_components_opt = {
+            if matches!(
+                self.spatial_predicate,
+                SpatialPredicate::KNearestNeighbors(_)
+            ) {
+                let knn_components = KnnComponents::new(cache_size, 
&self.indexed_batches)?;
+                
self.record_memory_usage(knn_components.estimated_memory_usage());
+                Some(knn_components)
+            } else {
+                None
+            }
+        };
 
         Ok(SpatialIndex {
             schema: self.schema,
             options: self.options,
             evaluator,
             refiner,
-            refiner_reservation,
             rtree,
             data_id_to_batch_pos: batch_pos_vec,
             indexed_batches: self.indexed_batches,
             geom_idx_vec,
             visited_build_side,
             probe_threads_counter: AtomicUsize::new(self.probe_threads_count),
-            knn_components,
+            knn_components: knn_components_opt,
             reservation: self.reservation,
         })
     }
@@ -307,4 +331,9 @@ impl SpatialIndexBuilder {
         self.reservation.try_grow(mem_bytes)?;
         Ok(())
     }
+
+    fn record_memory_usage(&mut self, bytes: usize) {
+        self.memory_used += bytes;
+        self.metrics.build_mem_used.set_max(self.memory_used);
+    }
 }
diff --git a/rust/sedona-spatial-join/src/refine.rs 
b/rust/sedona-spatial-join/src/refine.rs
index 26fc2899..6c41de42 100644
--- a/rust/sedona-spatial-join/src/refine.rs
+++ b/rust/sedona-spatial-join/src/refine.rs
@@ -59,9 +59,23 @@ pub(crate) trait IndexQueryResultRefiner: Send + Sync {
         index_query_results: &[IndexQueryResult],
     ) -> Result<Vec<(i32, i32)>>;
 
+    /// Estimate the maximum memory usage of the refiner in bytes. Some 
refiner may hold prepared
+    /// geometry in memory to accelerate predicate evaluations and consume 
non-trivial amount of
+    /// memory. This method estimates the maximum memory usage based on the 
statistics of the build-side
+    /// that will be processed. The estimation may not be accurate. The actual 
memory usage
+    /// can be obtained from spatial join metrics (see 
[`crate::stream::SpatialJoinProbeMetrics`]
+    /// for more details)
+    ///
+    /// # Arguments
+    /// * `build_stats` - The statistics of build-side geometry batches that 
will be processed by the refiner.
+    ///
+    /// # Returns
+    /// * `usize` - Estimated maximum memory usage in bytes
+    fn estimate_max_memory_usage(&self, build_stats: &GeoStatistics) -> usize;
+
     /// Get the current memory usage of the refiner in bytes.
     ///
-    /// Used for memory tracking and reservation management. Implementations 
should account for
+    /// Used for updating memory usage metrics. Implementations should account 
for
     /// prepared geometry caches, internal data structures, and temporary 
computation buffers.
     ///
     /// # Returns
diff --git a/rust/sedona-spatial-join/src/refine/geo.rs 
b/rust/sedona-spatial-join/src/refine/geo.rs
index 5d13b5e4..408f575d 100644
--- a/rust/sedona-spatial-join/src/refine/geo.rs
+++ b/rust/sedona-spatial-join/src/refine/geo.rs
@@ -174,6 +174,10 @@ impl IndexQueryResultRefiner for GeoRefiner {
         }
     }
 
+    fn estimate_max_memory_usage(&self, _build_stats: &GeoStatistics) -> usize 
{
+        0
+    }
+
     fn mem_usage(&self) -> usize {
         0
     }
diff --git a/rust/sedona-spatial-join/src/refine/geos.rs 
b/rust/sedona-spatial-join/src/refine/geos.rs
index be6fbf90..8cee6d50 100644
--- a/rust/sedona-spatial-join/src/refine/geos.rs
+++ b/rust/sedona-spatial-join/src/refine/geos.rs
@@ -345,6 +345,13 @@ impl IndexQueryResultRefiner for GeosRefiner {
         }
     }
 
+    fn estimate_max_memory_usage(&self, build_stats: &GeoStatistics) -> usize {
+        // TODO: This is a rough estimate of the memory usage of the prepared 
geometry and
+        // may not be accurate.
+        // https://github.com/apache/sedona-db/issues/281
+        build_stats.total_size_bytes().unwrap_or(0) as usize * 4
+    }
+
     fn mem_usage(&self) -> usize {
         self.mem_usage.load(Ordering::Relaxed)
     }
diff --git a/rust/sedona-spatial-join/src/refine/tg.rs 
b/rust/sedona-spatial-join/src/refine/tg.rs
index 4b1213d6..01e6a411 100644
--- a/rust/sedona-spatial-join/src/refine/tg.rs
+++ b/rust/sedona-spatial-join/src/refine/tg.rs
@@ -267,6 +267,13 @@ impl IndexQueryResultRefiner for TgRefiner {
         }
     }
 
+    fn estimate_max_memory_usage(&self, build_stats: &GeoStatistics) -> usize {
+        // TODO: This is a rough estimate of the memory usage of the tg 
geometry and
+        // may not be accurate.
+        // https://github.com/apache/sedona-db/issues/281
+        build_stats.total_size_bytes().unwrap_or(0) as usize * 2
+    }
+
     fn mem_usage(&self) -> usize {
         self.mem_usage.load(Ordering::Relaxed)
     }
diff --git a/rust/sedona-spatial-join/src/utils.rs 
b/rust/sedona-spatial-join/src/utils.rs
index 3f53df47..42a257f0 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join/src/utils.rs
@@ -17,7 +17,6 @@
 
 pub(crate) mod arrow_utils;
 pub(crate) mod bbox_sampler;
-pub(crate) mod concurrent_reservation;
 pub(crate) mod init_once_array;
 pub(crate) mod join_utils;
 pub(crate) mod once_fut;
diff --git a/rust/sedona-spatial-join/src/utils/concurrent_reservation.rs 
b/rust/sedona-spatial-join/src/utils/concurrent_reservation.rs
deleted file mode 100644
index c319528f..00000000
--- a/rust/sedona-spatial-join/src/utils/concurrent_reservation.rs
+++ /dev/null
@@ -1,176 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-use datafusion_common::Result;
-use parking_lot::Mutex;
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-use datafusion_execution::memory_pool::MemoryReservation;
-
-/// A wrapper around `MemoryReservation` that reserves memory ahead of time to 
reduce the contention
-/// caused by concurrent access to the reservation. It will reserve more 
memory than requested to
-/// skip growing the reservation for the next few reservation growth requests, 
until the actually
-/// reserved size is smaller than the requested size.
-pub(crate) struct ConcurrentReservation {
-    reservation: Mutex<MemoryReservation>,
-    /// The size of reservation. This should be equal to `reservation.size()`. 
This is used to
-    /// minimize contention and avoid growing the underlying reservation in 
the fast path.
-    reserved_size: AtomicUsize,
-    prealloc_size: usize,
-}
-
-impl ConcurrentReservation {
-    pub fn try_new(prealloc_size: usize, reservation: MemoryReservation) -> 
Result<Self> {
-        let actual_size = reservation.size();
-
-        Ok(Self {
-            reservation: Mutex::new(reservation),
-            reserved_size: AtomicUsize::new(actual_size),
-            prealloc_size,
-        })
-    }
-
-    /// Resize the reservation to the given size. If the new size is smaller 
or equal to the current
-    /// reserved size, do nothing. Otherwise grow the reservation to be 
`prealloc_size` larger than
-    /// the new size. This is for reducing the frequency of growing the 
underlying reservation.
-    pub fn resize(&self, new_size: usize) -> Result<()> {
-        // Fast path: the reserved size is already large enough, no need to 
lock and grow the reservation
-        if new_size <= self.reserved_size.load(Ordering::Relaxed) {
-            return Ok(());
-        }
-
-        // Slow path: lock the mutex for possible reservation growth
-        let mut reservation = self.reservation.lock();
-        let current_size = reservation.size();
-
-        // Double-check under the lock in case another thread already grew it
-        if new_size <= current_size {
-            return Ok(());
-        }
-
-        // Grow the reservation to the target size
-        let growth_needed = new_size + self.prealloc_size - current_size;
-        reservation.try_grow(growth_needed)?;
-
-        // Update our atomic to reflect the new size
-        let final_size = reservation.size();
-        self.reserved_size.store(final_size, Ordering::Relaxed);
-
-        Ok(())
-    }
-
-    #[cfg(test)]
-    pub fn actual_size(&self) -> usize {
-        self.reservation.lock().size()
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool};
-    use std::sync::Arc;
-    use std::thread;
-
-    fn create_test_reservation(size: usize) -> MemoryReservation {
-        let pool: Arc<dyn MemoryPool> =
-            
Arc::new(datafusion_execution::memory_pool::UnboundedMemoryPool::default());
-        let consumer = MemoryConsumer::new("test");
-        let mut reservation = consumer.register(&pool);
-        if size > 0 {
-            reservation.grow(size);
-        }
-        reservation
-    }
-
-    #[test]
-    fn test_basic_functionality() {
-        let reservation = create_test_reservation(0);
-        let concurrent = ConcurrentReservation::try_new(100, 
reservation).unwrap();
-
-        assert_eq!(concurrent.reserved_size.load(Ordering::Relaxed), 0);
-
-        // Resize to smaller should be no-op
-        concurrent.resize(0).unwrap();
-        assert_eq!(concurrent.reserved_size.load(Ordering::Relaxed), 0);
-
-        // Resize to larger should grow
-        let initial_size = concurrent.reserved_size.load(Ordering::Relaxed);
-        concurrent.resize(200).unwrap();
-        let final_size = concurrent.reserved_size.load(Ordering::Relaxed);
-        assert!(final_size >= 200);
-        assert!(final_size > initial_size);
-        assert!(concurrent.actual_size() >= 200);
-    }
-
-    #[test]
-    fn test_with_existing_reservation() {
-        let reservation = create_test_reservation(50);
-        let concurrent = ConcurrentReservation::try_new(100, 
reservation).unwrap();
-
-        // Initial state should have already reserved size accounted
-        assert_eq!(concurrent.reserved_size.load(Ordering::Relaxed), 50);
-
-        // Smaller size won't grow the reservation
-        concurrent.resize(10).unwrap();
-        assert_eq!(concurrent.reserved_size.load(Ordering::Relaxed), 50);
-        assert_eq!(concurrent.actual_size(), 50);
-
-        // Won't grow the reservation
-        concurrent.resize(50).unwrap();
-        assert_eq!(concurrent.reserved_size.load(Ordering::Relaxed), 50);
-        assert_eq!(concurrent.actual_size(), 50);
-
-        // Resize to larger should grow
-        concurrent.resize(100).unwrap();
-        assert_eq!(concurrent.reserved_size.load(Ordering::Relaxed), 200);
-        assert_eq!(concurrent.actual_size(), 200);
-    }
-
-    #[test]
-    fn test_concurrent_access() {
-        let reservation = create_test_reservation(0);
-        let concurrent = Arc::new(ConcurrentReservation::try_new(1000, 
reservation).unwrap());
-
-        let mut handles = vec![];
-
-        // Spawn multiple threads that try to resize concurrently
-        for i in 0..10 {
-            let concurrent_clone: Arc<ConcurrentReservation> = 
Arc::clone(&concurrent);
-            let handle = thread::spawn(move || {
-                // Each thread tries to resize to a different size
-                let target_size = (i + 1) * 500;
-                concurrent_clone.resize(target_size).unwrap();
-
-                // Verify the invariant: actual size should be >= requested 
size
-                let actual = 
concurrent_clone.reserved_size.load(Ordering::Relaxed);
-                assert!(actual >= target_size);
-                let actual = concurrent_clone.actual_size();
-                assert!(actual >= target_size);
-            });
-            handles.push(handle);
-        }
-
-        // Wait for all threads to complete
-        for handle in handles {
-            handle.join().unwrap();
-        }
-
-        // Final size should be at least as large as the largest request
-        assert!(concurrent.reserved_size.load(Ordering::Relaxed) >= 5000);
-        assert!(concurrent.actual_size() >= 5000);
-    }
-}

Reply via email to