Kontinuation commented on code in PR #573: URL: https://github.com/apache/sedona-db/pull/573#discussion_r2780484222
########## rust/sedona-spatial-join/src/probe/knn_results_merger.rs: ########## @@ -0,0 +1,2226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use core::f64; +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayBuilder, AsArray, Float64Array, ListArray, OffsetBufferBuilder, PrimitiveBuilder, + RecordBatch, StructArray, UInt64Array, +}; +use arrow::buffer::OffsetBuffer; +use arrow::compute::{concat, concat_batches, interleave}; +use arrow::datatypes::{DataType, Field, Float64Type, Schema, SchemaRef, UInt64Type}; +use arrow_array::ArrayRef; +use arrow_schema::Fields; +use datafusion::config::SpillCompression; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result}; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_physical_plan::metrics::SpillMetrics; +use sedona_common::sedona_internal_err; + +use crate::index::spatial_index::DISTANCE_TOLERANCE; +use crate::utils::spill::{RecordBatchSpillReader, RecordBatchSpillWriter}; + +/// [UnprocessedKNNResultBatch] represents the KNN results produced by probing the spatial index. +/// An [UnprocessedKNNResultBatch] may include KNN results for multiple probe rows. +/// +/// The KNN results are stored in a StructArray, where each row corresponds to a KNN result. +/// The results for the same probe row are stored in contiguous rows, and the offsets to +/// split the results into groups per probe row are stored in the `offsets` field. +/// +/// Each probe row has a unique index. The index must be strictly increasing +/// across probe rows. The sequence of index across the entire sequence of ingested +/// [UnprocessedKNNResultBatch] must also be strictly increasing. The index is computed based on +/// the 0-based index of the probe row in this probe partition. +/// +/// The KNN results are filtered, meaning that the original KNN results obtained by probing +/// the spatial index may be further filtered based on some predicates. It is also possible that +/// all the KNN results for a probe row are filtered out. However, we still need to keep track of the +/// distances of unfiltered results to correctly compute the top-K distances before filtering. This +/// is critical for correctly merging KNN results from multiple partitions. +/// +/// Imagine that a KNN query for a probe row yields the following 5 results (K = 5): +/// +/// ```text +/// D0 D1 D2 D3 D4 +/// R0 R2 R3 +/// ``` +/// +/// Where Di is the distance of the i-th nearest neighbor, and Ri is the result row index. +/// R1 and R4 are filtered out based on some predicate, so the final results only contain R0, R2, and R3. +/// The core idea is that the filtering is applied AFTER determining the top-K distances, so the number +/// of final results may be less than K. +/// +/// However, if we split the object side of KNN join into 2 partitions, and the KNN results from +/// each partition are as follows: +/// +/// ```text +/// Partition 0: +/// D1 D3 D5 D6 D7 +/// R3 R6 R7 +/// +/// Partition 1: +/// D0 D2 D4 D8 D9 +/// R0 R2 R8 +/// ``` +/// +/// If we blindly merge the filtered results from both partitions and take top-k, we would get: +/// +/// ```text +/// D0 D2 D3 D6 D8 +/// R0 R2 R3 R6 R8 +/// ``` +/// +/// Which contains more results than single-partitioned KNN join (i.e., 5 results instead of 3). This is +/// incorrect. +/// +/// When merging the results from both partitions, we need to consider the distances of all unfiltered +/// results to correctly determine the top-K distances before filtering. In this case, the top-5 distances +/// are D0, D1, D2, D3, and D4. We take D4 as the distance threshold to filter merged results. After filtering, +/// we still get R0, R2, and R3 as the final results. +/// +/// Please note that the KNN results for the last probe row in this array may be incomplete, +/// this is due to batch slicing during probe result batch production. We should be cautious +/// and correctly handle the KNN results for each probe row across multiple slices. +/// +/// Here is a concrete example: the [UnprocessedKNNResultBatch] may contain KNN results for 3 probe rows: +/// +/// ```text +/// [P0, R00] +/// [P0, R01] +/// [P0, R02] +/// [P1, R10] +/// [P1, R11] +/// [P1, R12] +/// [P2, R20] +/// ``` +/// +/// Where Pi is the i-th probe row, and Rij is the j-th KNN result for probe row Pi. +/// The KNN results for probe row P2 could be incomplete, and the next ingested KNN result batch +/// may contain more results for probe row P2: +/// +/// ```text +/// [P2, R21] +/// [P2, R22] +/// [P3, R30] +/// ... +/// ``` +/// +/// In practice, we process the KNN results or a probe row only when we have seen all its results. +/// The may-be incomplete tail part of an ingested [UnprocessedKNNResultBatch] is sliced and concatenated with +/// the next ingested [UnprocessedKNNResultBatch] to form a complete set of KNN results for that probe row. +/// This slicing and concatenating won't happen frequently in practice (once per ingested batch +/// on average), so the performance impact is minimal. +struct UnprocessedKNNResultBatch { + row_array: StructArray, + probe_indices: Vec<usize>, + distances: Vec<f64>, + unfiltered_probe_indices: Vec<usize>, + unfiltered_distances: Vec<f64>, +} + +impl UnprocessedKNNResultBatch { + fn new( + row_array: StructArray, + probe_indices: Vec<usize>, + distances: Vec<f64>, + unfiltered_probe_indices: Vec<usize>, + unfiltered_distances: Vec<f64>, + ) -> Self { + Self { + row_array, + probe_indices, + distances, + unfiltered_probe_indices, + unfiltered_distances, + } + } + + /// Create a new [UnprocessedKNNResultBatch] representing the unprocessed tail KNN results + /// from an unprocessed [KNNProbeResult]. + fn new_unprocessed_tail(tail: KNNProbeResult<'_>, row_array: &StructArray) -> Self { + let index = tail.probe_row_index; + let num_rows = tail.row_range.len(); + let num_unfiltered_rows = tail.unfiltered_distances.len(); + + let sliced_row_array = row_array.slice(tail.row_range.start, num_rows); + let probe_indices = vec![index; num_rows]; + let distances = tail.distances.to_vec(); + let unfiltered_probe_indices = vec![index; num_unfiltered_rows]; + let unfiltered_distances = tail.unfiltered_distances.to_vec(); + + Self { + row_array: sliced_row_array, + probe_indices, + distances, + unfiltered_probe_indices, + unfiltered_distances, + } + } + + /// Merge the current [UnprocessedKNNResultBatch] with another one, producing a new + /// [UnprocessedKNNResultBatch]. + fn merge(self, other: Self) -> Result<Self> { + let concat_array = + concat(&[&self.row_array, &other.row_array]).map_err(|e| arrow_datafusion_err!(e))?; + let mut probe_indices = self.probe_indices; + probe_indices.extend(other.probe_indices); + let mut distances = self.distances; + distances.extend(other.distances); + let mut unfiltered_probe_indices = self.unfiltered_probe_indices; + unfiltered_probe_indices.extend(other.unfiltered_probe_indices); + let mut unfiltered_distances = self.unfiltered_distances; + unfiltered_distances.extend(other.unfiltered_distances); + + Ok(Self { + row_array: concat_array.as_struct().clone(), + probe_indices, + distances, + unfiltered_probe_indices, + unfiltered_distances, + }) + } +} + +/// Reorganize [UnprocessedKNNResultBatch] for easier processing. The main goal is to group KNN results by +/// probe row index. There is an iterator implementation [KNNProbeResultIterator] that yields +/// [KNNProbeResult] for each probe row in order. +struct KNNResultArray { + /// The KNN result batches produced by probing the spatial index with a probe batch + array: StructArray, + /// Distance for each KNN result row + distances: Vec<f64>, + /// Index for each probe row, this must be strictly increasing. + indices: Vec<usize>, + /// Offsets to split the batches into groups per probe row. It is always of length + /// `indices.len() + 1`. + offsets: Vec<usize>, + /// Indices for each unfiltered probe row, This is a superset of `indices`. + /// This must be strictly increasing. + unfiltered_indices: Vec<usize>, + /// Distances for each unfiltered KNN result row. This is a superset of `distances`. + unfiltered_distances: Vec<f64>, + /// Offsets to split the unfiltered distances into groups per probe row. It is always of length + /// `unfiltered_indices.len() + 1`. + unfiltered_offsets: Vec<usize>, +} + +impl KNNResultArray { + fn new(unprocessed_batch: UnprocessedKNNResultBatch) -> Self { + let UnprocessedKNNResultBatch { + row_array, + probe_indices, + distances, + unfiltered_probe_indices, + unfiltered_distances, + .. + } = unprocessed_batch; + + assert_eq!(row_array.len(), probe_indices.len()); + assert_eq!(probe_indices.len(), distances.len()); + assert_eq!(unfiltered_probe_indices.len(), unfiltered_distances.len()); + assert!(probe_indices.len() <= unfiltered_probe_indices.len()); + + let compute_range_encoding = |mut indices: Vec<usize>| { + let mut offsets = Vec::with_capacity(indices.len() + 1); + offsets.push(0); Review Comment: It does not depend on any internal states. I can easily pull it out as a standalone function. ########## rust/sedona-spatial-join/src/probe/knn_results_merger.rs: ########## @@ -0,0 +1,2226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use core::f64; +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{ + Array, ArrayBuilder, AsArray, Float64Array, ListArray, OffsetBufferBuilder, PrimitiveBuilder, + RecordBatch, StructArray, UInt64Array, +}; +use arrow::buffer::OffsetBuffer; +use arrow::compute::{concat, concat_batches, interleave}; +use arrow::datatypes::{DataType, Field, Float64Type, Schema, SchemaRef, UInt64Type}; +use arrow_array::ArrayRef; +use arrow_schema::Fields; +use datafusion::config::SpillCompression; +use datafusion_common::{arrow_datafusion_err, DataFusionError, Result}; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_physical_plan::metrics::SpillMetrics; +use sedona_common::sedona_internal_err; + +use crate::index::spatial_index::DISTANCE_TOLERANCE; +use crate::utils::spill::{RecordBatchSpillReader, RecordBatchSpillWriter}; + +/// [UnprocessedKNNResultBatch] represents the KNN results produced by probing the spatial index. +/// An [UnprocessedKNNResultBatch] may include KNN results for multiple probe rows. +/// +/// The KNN results are stored in a StructArray, where each row corresponds to a KNN result. +/// The results for the same probe row are stored in contiguous rows, and the offsets to +/// split the results into groups per probe row are stored in the `offsets` field. +/// +/// Each probe row has a unique index. The index must be strictly increasing +/// across probe rows. The sequence of index across the entire sequence of ingested +/// [UnprocessedKNNResultBatch] must also be strictly increasing. The index is computed based on +/// the 0-based index of the probe row in this probe partition. +/// +/// The KNN results are filtered, meaning that the original KNN results obtained by probing +/// the spatial index may be further filtered based on some predicates. It is also possible that +/// all the KNN results for a probe row are filtered out. However, we still need to keep track of the +/// distances of unfiltered results to correctly compute the top-K distances before filtering. This +/// is critical for correctly merging KNN results from multiple partitions. +/// +/// Imagine that a KNN query for a probe row yields the following 5 results (K = 5): +/// +/// ```text +/// D0 D1 D2 D3 D4 +/// R0 R2 R3 +/// ``` +/// +/// Where Di is the distance of the i-th nearest neighbor, and Ri is the result row index. +/// R1 and R4 are filtered out based on some predicate, so the final results only contain R0, R2, and R3. +/// The core idea is that the filtering is applied AFTER determining the top-K distances, so the number +/// of final results may be less than K. +/// +/// However, if we split the object side of KNN join into 2 partitions, and the KNN results from +/// each partition are as follows: +/// +/// ```text +/// Partition 0: +/// D1 D3 D5 D6 D7 +/// R3 R6 R7 +/// +/// Partition 1: +/// D0 D2 D4 D8 D9 +/// R0 R2 R8 +/// ``` +/// +/// If we blindly merge the filtered results from both partitions and take top-k, we would get: +/// +/// ```text +/// D0 D2 D3 D6 D8 +/// R0 R2 R3 R6 R8 +/// ``` +/// +/// Which contains more results than single-partitioned KNN join (i.e., 5 results instead of 3). This is +/// incorrect. +/// +/// When merging the results from both partitions, we need to consider the distances of all unfiltered +/// results to correctly determine the top-K distances before filtering. In this case, the top-5 distances +/// are D0, D1, D2, D3, and D4. We take D4 as the distance threshold to filter merged results. After filtering, +/// we still get R0, R2, and R3 as the final results. +/// +/// Please note that the KNN results for the last probe row in this array may be incomplete, +/// this is due to batch slicing during probe result batch production. We should be cautious +/// and correctly handle the KNN results for each probe row across multiple slices. +/// +/// Here is a concrete example: the [UnprocessedKNNResultBatch] may contain KNN results for 3 probe rows: +/// +/// ```text +/// [P0, R00] +/// [P0, R01] +/// [P0, R02] +/// [P1, R10] +/// [P1, R11] +/// [P1, R12] +/// [P2, R20] +/// ``` +/// +/// Where Pi is the i-th probe row, and Rij is the j-th KNN result for probe row Pi. +/// The KNN results for probe row P2 could be incomplete, and the next ingested KNN result batch +/// may contain more results for probe row P2: +/// +/// ```text +/// [P2, R21] +/// [P2, R22] +/// [P3, R30] +/// ... +/// ``` +/// +/// In practice, we process the KNN results or a probe row only when we have seen all its results. +/// The may-be incomplete tail part of an ingested [UnprocessedKNNResultBatch] is sliced and concatenated with +/// the next ingested [UnprocessedKNNResultBatch] to form a complete set of KNN results for that probe row. +/// This slicing and concatenating won't happen frequently in practice (once per ingested batch +/// on average), so the performance impact is minimal. +struct UnprocessedKNNResultBatch { + row_array: StructArray, + probe_indices: Vec<usize>, + distances: Vec<f64>, + unfiltered_probe_indices: Vec<usize>, + unfiltered_distances: Vec<f64>, +} + +impl UnprocessedKNNResultBatch { + fn new( + row_array: StructArray, + probe_indices: Vec<usize>, + distances: Vec<f64>, + unfiltered_probe_indices: Vec<usize>, + unfiltered_distances: Vec<f64>, + ) -> Self { + Self { + row_array, + probe_indices, + distances, + unfiltered_probe_indices, + unfiltered_distances, + } + } + + /// Create a new [UnprocessedKNNResultBatch] representing the unprocessed tail KNN results + /// from an unprocessed [KNNProbeResult]. + fn new_unprocessed_tail(tail: KNNProbeResult<'_>, row_array: &StructArray) -> Self { + let index = tail.probe_row_index; + let num_rows = tail.row_range.len(); + let num_unfiltered_rows = tail.unfiltered_distances.len(); + + let sliced_row_array = row_array.slice(tail.row_range.start, num_rows); + let probe_indices = vec![index; num_rows]; + let distances = tail.distances.to_vec(); + let unfiltered_probe_indices = vec![index; num_unfiltered_rows]; + let unfiltered_distances = tail.unfiltered_distances.to_vec(); + + Self { + row_array: sliced_row_array, + probe_indices, + distances, + unfiltered_probe_indices, + unfiltered_distances, + } + } + + /// Merge the current [UnprocessedKNNResultBatch] with another one, producing a new + /// [UnprocessedKNNResultBatch]. + fn merge(self, other: Self) -> Result<Self> { + let concat_array = + concat(&[&self.row_array, &other.row_array]).map_err(|e| arrow_datafusion_err!(e))?; Review Comment: Removed all redundant calls to arrow_datafusion_err. I forgot why I added that in the first place. ########## rust/sedona-spatial-join/src/exec.rs: ########## @@ -1478,32 +1489,60 @@ mod tests { results } + #[rstest] #[tokio::test] - async fn test_knn_join_correctness() -> Result<()> { + async fn test_knn_join_correctness( + // TODO: Currently the underlying geo-index KNN implementation has bugs working with non-point + // geometries, so this test is restricted to point_only = true. Once + // https://github.com/georust/geo-index/pull/151 (fixing non-point KNN support) is + // released, add #[values(true, false)] here to also exercise non-point data. Review Comment: OK. I have updated the Cargo.toml and the test cases. I think I need make another round of breaking change to get rid of the "use-geo_0_31" feature flag. ########## rust/sedona-spatial-join/src/utils/join_utils.rs: ########## @@ -300,9 +301,28 @@ pub(crate) fn apply_join_filter_to_indices( let left_filtered = compute::filter(&build_indices, mask)?; let right_filtered = compute::filter(&probe_indices, mask)?; + + let filtered_distances = if let Some(distances) = distances { + debug_assert_eq!( + distances.len(), + build_indices.len(), + "distances length should match indices length" + ); + let dist_array = arrow_array::Float64Array::from(distances.to_vec()); + let filtered = compute::filter(&dist_array, mask)?; + let filtered = filtered + .as_any() + .downcast_ref::<arrow_array::Float64Array>() + .expect("filtered distance array should be Float64Array"); + Some(filtered.values().to_vec()) Review Comment: I still prefer using `compute::filter` to make it easy to observe that the computations done for distance is roughly the same as build_indices and probe_indices. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
