Copilot commented on code in PR #523:
URL: https://github.com/apache/sedona-db/pull/523#discussion_r2698935039
##########
rust/sedona-spatial-join/src/stream.rs:
##########
@@ -242,54 +250,51 @@ impl SpatialJoinStream {
}
}
- fn process_probe_batch(&mut self) ->
Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
- let timer = self.join_metrics.join_time.timer();
+ fn process_probe_batch(
+ &mut self,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
+ let _timer = self.join_metrics.join_time.timer();
Review Comment:
The variable is prefixed with an underscore but is not actually unused - the
timer measures execution time until it's dropped. Remove the underscore prefix
to properly indicate it's being used for timing purposes.
```suggestion
let timer = self.join_metrics.join_time.timer();
```
##########
rust/sedona-spatial-join/src/index/spatial_index.rs:
##########
@@ -409,6 +416,177 @@ impl SpatialIndex {
})
}
+ /// Query the spatial index with a batch of probe geometries to find
matching build-side geometries.
+ ///
+ /// This method iterates over the probe geometries in the given range of
the evaluated batch.
+ /// For each probe geometry, it performs the two-phase spatial join query:
+ /// 1. **Filter phase**: Uses the R-tree index with the probe geometry's
bounding rectangle
+ /// to quickly identify candidate geometries.
+ /// 2. **Refinement phase**: Evaluates the exact spatial predicate on
candidates to determine
+ /// actual matches.
+ ///
+ /// # Arguments
+ /// * `evaluated_batch` - The batch containing probe geometries and their
bounding rectangles
+ /// * `range` - The range of rows in the evaluated batch to process
+ /// * `max_result_size` - The maximum number of results to collect before
stopping. If the
+ /// number of results exceeds this limit, the method returns early.
+ /// * `build_batch_positions` - Output vector that will be populated with
(batch_idx, row_idx)
+ /// pairs for each matching build-side geometry
+ /// * `probe_indices` - Output vector that will be populated with the
indices of probe geometries
+ /// that have matches each probe geometry processed
+ ///
+ /// # Returns
+ /// * A tuple containing:
+ /// - `QueryResultMetrics`: Aggregated metrics (total matches and
candidates) for the processed rows
+ /// - `usize`: The index of the next row to process (exclusive end of
the processed range)
+ pub(crate) async fn query_batch(
+ self: &Arc<Self>,
+ evaluated_batch: &Arc<EvaluatedBatch>,
+ range: Range<usize>,
+ max_result_size: usize,
+ build_batch_positions: &mut Vec<(i32, i32)>,
+ probe_indices: &mut Vec<u32>,
+ ) -> Result<(QueryResultMetrics, usize)> {
Review Comment:
The documentation states 'each probe geometry processed' for the
`probe_indices` parameter, but the description is incomplete. It should clarify
that probe_indices contains the row indices of probe geometries that produced
matches, with one entry per match.
##########
rust/sedona-spatial-join/src/index/spatial_index.rs:
##########
@@ -409,6 +416,177 @@ impl SpatialIndex {
})
}
+ /// Query the spatial index with a batch of probe geometries to find
matching build-side geometries.
+ ///
+ /// This method iterates over the probe geometries in the given range of
the evaluated batch.
+ /// For each probe geometry, it performs the two-phase spatial join query:
+ /// 1. **Filter phase**: Uses the R-tree index with the probe geometry's
bounding rectangle
+ /// to quickly identify candidate geometries.
+ /// 2. **Refinement phase**: Evaluates the exact spatial predicate on
candidates to determine
+ /// actual matches.
+ ///
+ /// # Arguments
+ /// * `evaluated_batch` - The batch containing probe geometries and their
bounding rectangles
+ /// * `range` - The range of rows in the evaluated batch to process
+ /// * `max_result_size` - The maximum number of results to collect before
stopping. If the
+ /// number of results exceeds this limit, the method returns early.
+ /// * `build_batch_positions` - Output vector that will be populated with
(batch_idx, row_idx)
+ /// pairs for each matching build-side geometry
+ /// * `probe_indices` - Output vector that will be populated with the
indices of probe geometries
+ /// that have matches each probe geometry processed
+ ///
+ /// # Returns
+ /// * A tuple containing:
+ /// - `QueryResultMetrics`: Aggregated metrics (total matches and
candidates) for the processed rows
+ /// - `usize`: The index of the next row to process (exclusive end of
the processed range)
+ pub(crate) async fn query_batch(
+ self: &Arc<Self>,
+ evaluated_batch: &Arc<EvaluatedBatch>,
+ range: Range<usize>,
+ max_result_size: usize,
+ build_batch_positions: &mut Vec<(i32, i32)>,
+ probe_indices: &mut Vec<u32>,
+ ) -> Result<(QueryResultMetrics, usize)> {
+ if range.is_empty() {
+ return Ok((
+ QueryResultMetrics {
+ count: 0,
+ candidate_count: 0,
+ },
+ range.start,
+ ));
+ }
+
+ let rects = evaluated_batch.rects();
+ let dist = evaluated_batch.distance();
+ let mut total_candidates_count = 0;
+ let mut total_count = 0;
+ let mut current_row_idx = range.start;
+ for row_idx in range {
+ current_row_idx = row_idx;
+ let Some(probe_rect) = rects[row_idx] else {
+ continue;
+ };
+
+ let min = probe_rect.min();
+ let max = probe_rect.max();
+ let mut candidates = self.rtree.search(min.x, min.y, max.x, max.y);
+ if candidates.is_empty() {
+ continue;
+ }
+
+ let Some(probe_wkb) = evaluated_batch.wkb(row_idx) else {
+ return sedona_internal_err!(
+ "Failed to get WKB for row {} in evaluated batch",
+ row_idx
+ );
+ };
+
+ // Sort and dedup candidates to avoid duplicate results when we
index one geometry
+ // using several boxes.
+ candidates.sort_unstable();
+ candidates.dedup();
+
+ let distance = match dist {
+ Some(dist_array) => distance_value_at(dist_array, row_idx)?,
+ None => None,
+ };
+
+ // Refine the candidates retrieved from the r-tree index by
evaluating the actual spatial predicate
+ let refine_chunk_size =
self.options.parallel_refinement_chunk_size;
+ if refine_chunk_size == 0 || candidates.len() < refine_chunk_size
* 2 {
+ // For small candidate sets, use refine synchronously
+ let metrics =
+ self.refine(probe_wkb, &candidates, &distance,
build_batch_positions)?;
+ probe_indices.extend(std::iter::repeat_n(row_idx as u32,
metrics.count));
+ total_count += metrics.count;
+ total_candidates_count += metrics.candidate_count;
+ } else {
+ // For large candidate sets, spawn several tasks to
parallelize refinement
+ let (metrics, positions) = self
+ .refine_concurrently(
+ evaluated_batch,
+ row_idx,
+ &candidates,
+ distance,
+ refine_chunk_size,
+ )
+ .await?;
+ build_batch_positions.extend(positions);
+ probe_indices.extend(std::iter::repeat_n(row_idx as u32,
metrics.count));
+ total_count += metrics.count;
+ total_candidates_count += metrics.candidate_count;
+ }
+
+ if total_count >= max_result_size {
+ break;
+ }
+ }
+
+ let end_idx = current_row_idx + 1;
Review Comment:
When the loop exits due to reaching `max_result_size`, `current_row_idx`
points to the row that triggered the limit. Adding 1 is correct. However, when
the loop completes normally (processes all rows in range), `current_row_idx`
will be the last value from the range iterator, and adding 1 could produce an
incorrect end_idx. The end_idx should be `min(current_row_idx + 1, range.end)`
to handle both early exit and complete iteration cases correctly.
```suggestion
let end_idx = current_row_idx.saturating_add(1).min(range.end);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]