Kontinuation commented on code in PR #523:
URL: https://github.com/apache/sedona-db/pull/523#discussion_r2699008083


##########
rust/sedona-spatial-join/src/index/spatial_index.rs:
##########
@@ -409,6 +416,177 @@ impl SpatialIndex {
         })
     }
 
+    /// Query the spatial index with a batch of probe geometries to find 
matching build-side geometries.
+    ///
+    /// This method iterates over the probe geometries in the given range of 
the evaluated batch.
+    /// For each probe geometry, it performs the two-phase spatial join query:
+    /// 1. **Filter phase**: Uses the R-tree index with the probe geometry's 
bounding rectangle
+    ///    to quickly identify candidate geometries.
+    /// 2. **Refinement phase**: Evaluates the exact spatial predicate on 
candidates to determine
+    ///    actual matches.
+    ///
+    /// # Arguments
+    /// * `evaluated_batch` - The batch containing probe geometries and their 
bounding rectangles
+    /// * `range` - The range of rows in the evaluated batch to process
+    /// * `max_result_size` - The maximum number of results to collect before 
stopping. If the
+    ///   number of results exceeds this limit, the method returns early.
+    /// * `build_batch_positions` - Output vector that will be populated with 
(batch_idx, row_idx)
+    ///   pairs for each matching build-side geometry
+    /// * `probe_indices` - Output vector that will be populated with the 
indices of probe geometries
+    ///   that have matches each probe geometry processed
+    ///
+    /// # Returns
+    /// * A tuple containing:
+    ///   - `QueryResultMetrics`: Aggregated metrics (total matches and 
candidates) for the processed rows
+    ///   - `usize`: The index of the next row to process (exclusive end of 
the processed range)
+    pub(crate) async fn query_batch(
+        self: &Arc<Self>,
+        evaluated_batch: &Arc<EvaluatedBatch>,
+        range: Range<usize>,
+        max_result_size: usize,
+        build_batch_positions: &mut Vec<(i32, i32)>,
+        probe_indices: &mut Vec<u32>,
+    ) -> Result<(QueryResultMetrics, usize)> {
+        if range.is_empty() {
+            return Ok((
+                QueryResultMetrics {
+                    count: 0,
+                    candidate_count: 0,
+                },
+                range.start,
+            ));
+        }
+
+        let rects = evaluated_batch.rects();
+        let dist = evaluated_batch.distance();
+        let mut total_candidates_count = 0;
+        let mut total_count = 0;
+        let mut current_row_idx = range.start;
+        for row_idx in range {
+            current_row_idx = row_idx;
+            let Some(probe_rect) = rects[row_idx] else {
+                continue;
+            };
+
+            let min = probe_rect.min();
+            let max = probe_rect.max();
+            let mut candidates = self.rtree.search(min.x, min.y, max.x, max.y);
+            if candidates.is_empty() {
+                continue;
+            }
+
+            let Some(probe_wkb) = evaluated_batch.wkb(row_idx) else {
+                return sedona_internal_err!(
+                    "Failed to get WKB for row {} in evaluated batch",
+                    row_idx
+                );
+            };
+
+            // Sort and dedup candidates to avoid duplicate results when we 
index one geometry
+            // using several boxes.
+            candidates.sort_unstable();
+            candidates.dedup();
+
+            let distance = match dist {
+                Some(dist_array) => distance_value_at(dist_array, row_idx)?,
+                None => None,
+            };
+
+            // Refine the candidates retrieved from the r-tree index by 
evaluating the actual spatial predicate
+            let refine_chunk_size = 
self.options.parallel_refinement_chunk_size;
+            if refine_chunk_size == 0 || candidates.len() < refine_chunk_size 
* 2 {
+                // For small candidate sets, use refine synchronously
+                let metrics =
+                    self.refine(probe_wkb, &candidates, &distance, 
build_batch_positions)?;
+                probe_indices.extend(std::iter::repeat_n(row_idx as u32, 
metrics.count));
+                total_count += metrics.count;
+                total_candidates_count += metrics.candidate_count;
+            } else {
+                // For large candidate sets, spawn several tasks to 
parallelize refinement
+                let (metrics, positions) = self
+                    .refine_concurrently(
+                        evaluated_batch,
+                        row_idx,
+                        &candidates,
+                        distance,
+                        refine_chunk_size,
+                    )
+                    .await?;
+                build_batch_positions.extend(positions);
+                probe_indices.extend(std::iter::repeat_n(row_idx as u32, 
metrics.count));
+                total_count += metrics.count;
+                total_candidates_count += metrics.candidate_count;
+            }
+
+            if total_count >= max_result_size {
+                break;
+            }
+        }
+
+        let end_idx = current_row_idx + 1;

Review Comment:
   No, this is incorrect. `current_row_idx` will still be the last valid index 
even when the loop terminates naturally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to