Kontinuation commented on code in PR #573:
URL: https://github.com/apache/sedona-db/pull/573#discussion_r2767033945


##########
rust/sedona-spatial-join/src/probe/knn_results_merger.rs:
##########
@@ -0,0 +1,2224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use core::f64;
+use std::ops::Range;
+use std::sync::Arc;
+
+use arrow::array::{
+    Array, ArrayBuilder, AsArray, Float64Array, ListArray, 
OffsetBufferBuilder, PrimitiveBuilder,
+    RecordBatch, StructArray, UInt64Array,
+};
+use arrow::buffer::OffsetBuffer;
+use arrow::compute::{concat, concat_batches, interleave};
+use arrow::datatypes::{DataType, Field, Float64Type, Schema, SchemaRef, 
UInt64Type};
+use arrow_array::ArrayRef;
+use arrow_schema::Fields;
+use datafusion::config::SpillCompression;
+use datafusion_common::{arrow_datafusion_err, DataFusionError, Result};
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_physical_plan::metrics::SpillMetrics;
+use sedona_common::sedona_internal_err;
+
+use crate::index::spatial_index::DISTANCE_TOLERANCE;
+use crate::utils::spill::{RecordBatchSpillReader, RecordBatchSpillWriter};
+
+/// [UnprocessedKNNResultBatch] represents the KNN results produced by probing 
the spatial index.
+/// An [UnprocessedKNNResultBatch] may include KNN results for multiple probe 
rows.
+///
+/// The KNN results are stored in a StructArray, where each row corresponds to 
a KNN result.
+/// The results for the same probe row are stored in contiguous rows, and the 
offsets to
+/// split the results into groups per probe row are stored in the `offsets` 
field.
+///
+/// Each probe row has a unique index. The index must be strictly increasing
+/// across probe rows. The sequence of index across the entire sequence of 
ingested
+/// [UnprocessedKNNResultBatch] must also be strictly increasing. The index is 
computed based on
+/// the 0-based index of the probe row in this probe partition.
+///
+/// The KNN results are filtered, meaning that the original KNN results 
obtained by probing
+/// the spatial index may be further filtered based on some predicates. It is 
also possible that
+/// all the KNN results for a probe row are filtered out. However, we still 
need to keep track of the
+/// distances of unfiltered results to correctly compute the top-K distances 
before filtering. This
+/// is critical for correctly merging KNN results from multiple partitions.
+///
+/// Imagine that a KNN query for a probe row yields the following 5 results (K 
= 5):
+///
+/// ```text
+/// D0  D1  D2  D3  D4
+/// R0      R2  R3
+/// ```
+///
+/// Where Di is the distance of the i-th nearest neighbor, and Ri is the 
result row index.
+/// R1 and R4 are filtered out based on some predicate, so the final results 
only contain R0, R2, and R3.
+/// The core idea is that the filtering is applied AFTER determining the top-K 
distances, so the number
+/// of final results may be less than K.
+///
+/// However, if we split the object side of KNN join into 2 partitions, and 
the KNN results from
+/// each partition are as follows:
+///
+/// ```text
+/// Partition 0:
+/// D1  D3  D5  D6  D7
+///     R3      R6  R7
+///
+/// Partition 1:
+/// D0  D2  D4  D8  D9
+/// R0  R2      R8
+/// ```
+///
+/// If we blindly merge the filtered results from both partitions and take 
top-k, we would get:
+///
+/// ```text
+/// D0  D2  D3  D6  D8
+/// R0  R2  R3  R6  R8
+/// ```
+///
+/// Which contains more results than single-partitioned KNN join (i.e., 5 
results instead of 3). This is
+/// incorrect.
+///
+/// When merging the results from both partitions, we need to consider the 
distances of all unfiltered
+/// results to correctly determine the top-K distances before filtering. In 
this case, the top-5 distances
+/// are D0, D1, D2, D3, and D4. We take D4 as the distance threshold to filter 
merged results. After filtering,
+/// we still get R0, R2, and R3 as the final results.
+///
+/// Please note that the KNN results for the last probe row in this array may 
be incomplete,
+/// this is due to batch slicing during probe result batch production. We 
should be cautious
+/// and correctly handle the KNN results for each probe row across multiple 
slices.
+///
+/// Here is a concrete example: the [UnprocessedKNNResultBatch] may contain 
KNN results for 3 probe rows:
+///
+/// ```text
+/// [P0, R00]
+/// [P0, R01]
+/// [P0, R02]
+/// [P1, R10]
+/// [P1, R11]
+/// [P1, R12]
+/// [P2, R20]
+/// ```
+///
+/// Where Pi is the i-th probe row, and Rij is the j-th KNN result for probe 
row Pi.
+/// The KNN results for probe row P2 could be incomplete, and the next 
ingested KNN result batch
+/// may contain more results for probe row P2:
+///
+/// ```text
+/// [P2, R21]
+/// [P2, R22]
+/// [P3, R30]
+/// ...
+/// ```
+///
+/// In practice, we process the KNN results or a probe row only when we have 
seen all its results.
+/// The may-be incomplete tail part of an ingested [UnprocessedKNNResultBatch] 
is sliced and concatenated with
+/// the next ingested [UnprocessedKNNResultBatch] to form a complete set of 
KNN results for that probe row.
+/// This slicing and concatenating won't happen frequently in practice (once 
per ingested batch
+/// on average), so the performance impact is minimal.
+struct UnprocessedKNNResultBatch {
+    row_array: StructArray,
+    probe_indices: Vec<usize>,
+    distances: Vec<f64>,
+    unfiltered_probe_indices: Vec<usize>,
+    unfiltered_distances: Vec<f64>,
+}
+
+impl UnprocessedKNNResultBatch {
+    fn new(
+        row_array: StructArray,
+        probe_indices: Vec<usize>,
+        distances: Vec<f64>,
+        unfiltered_probe_indices: Vec<usize>,
+        unfiltered_distances: Vec<f64>,
+    ) -> Self {
+        Self {
+            row_array,
+            probe_indices,
+            distances,
+            unfiltered_probe_indices,
+            unfiltered_distances,
+        }
+    }
+
+    /// Create a new [UnprocessedKNNResultBatch] representing the unprocessed 
tail KNN results
+    /// from an unprocessed [KNNProbeResult].
+    fn new_unprocessed_tail(tail: KNNProbeResult<'_>, row_array: &StructArray) 
-> Self {
+        let index = tail.probe_row_index;
+        let num_rows = tail.row_range.len();
+        let num_unfiltered_rows = tail.unfiltered_distances.len();
+
+        let sliced_row_array = row_array.slice(tail.row_range.start, num_rows);
+        let probe_indices = vec![index; num_rows];
+        let distances = tail.distances.to_vec();
+        let unfiltered_probe_indices = vec![index; num_unfiltered_rows];
+        let unfiltered_distances = tail.unfiltered_distances.to_vec();
+
+        Self {
+            row_array: sliced_row_array,
+            probe_indices,
+            distances,
+            unfiltered_probe_indices,
+            unfiltered_distances,
+        }
+    }
+
+    /// Merge the current [UnprocessedKNNResultBatch] with another one, 
producing a new
+    /// [UnprocessedKNNResultBatch].
+    fn merge(self, other: Self) -> Result<Self> {
+        let concat_array =
+            concat(&[&self.row_array, &other.row_array]).map_err(|e| 
arrow_datafusion_err!(e))?;
+        let mut probe_indices = self.probe_indices;
+        probe_indices.extend(other.probe_indices);
+        let mut distances = self.distances;
+        distances.extend(other.distances);
+        let mut unfiltered_probe_indices = self.unfiltered_probe_indices;
+        unfiltered_probe_indices.extend(other.unfiltered_probe_indices);
+        let mut unfiltered_distances = self.unfiltered_distances;
+        unfiltered_distances.extend(other.unfiltered_distances);
+
+        Ok(Self {
+            row_array: concat_array.as_struct().clone(),
+            probe_indices,
+            distances,
+            unfiltered_probe_indices,
+            unfiltered_distances,
+        })
+    }
+}
+
+/// Reorganize [UnprocessedKNNResultBatch] for easier processing. The main 
goal is to group KNN results by
+/// probe row index. There is an iterator implementation 
[KNNProbeResultIterator] that yields
+/// [KNNProbeResult] for each probe row in order.
+struct KNNResultArray {
+    /// The KNN result batches produced by probing the spatial index with a 
probe batch
+    array: StructArray,
+    /// Distance for each KNN result row
+    distances: Vec<f64>,
+    /// Index for each probe row, this must be strictly increasing.
+    indices: Vec<usize>,
+    /// Offsets to split the batches into groups per probe row. It is always 
of length
+    /// `indices.len() + 1`.
+    offsets: Vec<usize>,
+    /// Indices for each unfiltered probe row, This is a superset of `indices`.
+    /// This must be strictly increasing.
+    unfiltered_indices: Vec<usize>,
+    /// Distances for each unfiltered KNN result row. This is a superset of 
`distances`.
+    unfiltered_distances: Vec<f64>,
+    /// Offsets to split the unfiltered distances into groups per probe row. 
It is always of length
+    /// `unfiltered_indices.len() + 1`.
+    unfiltered_offsets: Vec<usize>,
+}
+
+impl KNNResultArray {
+    fn new(unprocessed_batch: UnprocessedKNNResultBatch) -> Self {
+        let UnprocessedKNNResultBatch {
+            row_array,
+            probe_indices,
+            distances,
+            unfiltered_probe_indices,
+            unfiltered_distances,
+            ..
+        } = unprocessed_batch;
+
+        assert_eq!(row_array.len(), probe_indices.len());
+        assert_eq!(probe_indices.len(), distances.len());
+        assert_eq!(unfiltered_probe_indices.len(), unfiltered_distances.len());
+        assert!(probe_indices.len() <= unfiltered_probe_indices.len());
+
+        let compute_range_encoding = |mut indices: Vec<usize>| {
+            let mut offsets = Vec::with_capacity(indices.len() + 1);
+            offsets.push(0);
+            if indices.is_empty() {
+                return (offsets, Vec::new());
+            }
+
+            let mut prev = indices[0];
+            let mut pos = 1;
+            for i in 1..indices.len() {
+                if indices[i] != prev {
+                    assert!(indices[i] > prev, "indices must be 
non-decreasing");
+                    offsets.push(i);
+                    indices[pos] = indices[i];
+                    pos += 1;
+                }
+                prev = indices[i];
+            }
+            offsets.push(indices.len());
+            indices.truncate(pos);
+            (offsets, indices)
+        };
+
+        let (offsets, indices) = compute_range_encoding(probe_indices);
+        let (unfiltered_offsets, unfiltered_indices) =
+            compute_range_encoding(unfiltered_probe_indices);
+
+        // The iterator implementation relies on `indices` being an in-order 
subsequence
+        // of `unfiltered_indices`.
+        debug_assert!({
+            let mut j = 0;
+            let mut ok = true;
+            for &g in &indices {
+                while j < unfiltered_indices.len() && unfiltered_indices[j] < 
g {
+                    j += 1;
+                }
+                if j >= unfiltered_indices.len() || unfiltered_indices[j] != g 
{
+                    ok = false;
+                    break;
+                }
+            }
+            ok
+        });
+        Self {
+            array: row_array,
+            distances,
+            indices,
+            offsets,
+            unfiltered_indices,
+            unfiltered_distances,
+            unfiltered_offsets,
+        }
+    }
+}
+
+/// KNNProbeResult represents a unified view for the KNN results for a single 
probe row.
+/// The KNN results can be from a spilled batch or an ingested batch. This 
intermediate
+/// data structure is for working with both spilled and ingested KNN results 
uniformly.
+///
+/// KNNProbeResult can also be used to represent KNN results for a probe row 
that has
+/// no filtered results. In this case, the `row_range` will be an empty range, 
and the
+/// `distances` will be an empty slice.
+struct KNNProbeResult<'a> {
+    /// Index of the probe row
+    probe_row_index: usize,
+    /// Range of KNN result rows in the implicitly referenced StructArray. The 
referenced
+    /// StructArray only contains filtered results.
+    row_range: Range<usize>,
+    /// Distances for each KNN result row
+    distances: &'a [f64],
+    /// Distances for each unfiltered result row. Some of the results were 
filtered so they
+    /// do not appear in the StructArray, but we still need the distances of 
all unfiltered
+    /// results to correctly compute the top-K distances before the filtering.
+    unfiltered_distances: &'a [f64],
+}
+
+impl<'a> KNNProbeResult<'a> {
+    fn new(
+        probe_row_index: usize,
+        row_range: Range<usize>,
+        distances: &'a [f64],
+        unfiltered_distances: &'a [f64],
+    ) -> Self {
+        assert_eq!(row_range.len(), distances.len());
+        // Please note that we don't have `unfiltered_distances.len() >= 
distances.len()` here.

Review Comment:
   Explaining them in detail is a good idea. I have added the comment. The 
comment suggested by Copilot was all wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to