Copilot commented on code in PR #563:
URL: https://github.com/apache/sedona-db/pull/563#discussion_r2747508993


##########
rust/sedona-spatial-join/src/stream.rs:
##########
@@ -524,6 +759,19 @@ impl SpatialJoinStream {
             spatial_index.merge_probe_stats(stats);
         }
 
+        let visited_probe_side = if matches!(partition_desc.partition, 
SpatialPartition::Multi) {
+            self.visited_multi_probe_side.clone()
+        } else {
+            None
+        };

Review Comment:
   The Arc clone operation for `visited_multi_probe_side` happens on every 
batch iteration within the Multi partition. Consider storing this value once 
when entering the Multi partition state instead of cloning it for each batch.



##########
rust/sedona-spatial-join/src/utils/join_utils.rs:
##########
@@ -53,6 +53,28 @@ pub(crate) fn need_produce_result_in_final(join_type: 
JoinType) -> bool {
     )
 }
 
+/// Determines if a bitmap is needed to track matched rows in the probe side's 
"Multi" partition.
+///
+/// In a spatial partitioned join, the "Multi" partition of the probe side 
overlaps with multiple
+/// partitions of the build side. Consequently, rows in the probe "Multi" 
partition are processed
+/// against multiple build partitions.
+///
+/// For `Right`, `RightSemi`, `RightAnti`, and `Full` joins, we must track 
whether a probe row
+/// has been matched across *any* of these interactions to correctly produce 
results:
+/// - **Right/Full Outer**: Emit probe rows that never matched any build 
partition (checked at the last build partition).
+/// - **Right Semi**: Emit a probe row the first time it matches, and suppress 
subsequent matches (deduplication).
+/// - **Right Anti**: Emit probe rows only if they never match any build 
partition (checked at the last build partition).
+pub(crate) fn need_probe_multi_partition_bitmap(join_type: JoinType) -> bool {
+    matches!(
+        join_type,
+        JoinType::Right
+            | JoinType::RightAnti
+            | JoinType::RightSemi
+            | JoinType::RightMark
+            | JoinType::Full
+    )
+}

Review Comment:
   The function name `need_probe_multi_partition_bitmap` is somewhat ambiguous. 
Consider renaming to `need_multi_partition_visited_bitmap` or 
`requires_probe_deduplication_bitmap` to more clearly indicate that this bitmap 
is specifically for tracking visited/matched rows across multiple build 
partitions.



##########
rust/sedona-spatial-join/src/stream.rs:
##########
@@ -185,49 +213,155 @@ impl SpatialJoinProbeMetrics {
 pub(crate) enum SpatialJoinStreamState {
     /// The initial mode: waiting for the spatial join components to become 
available
     WaitPrepareSpatialJoinComponents,
-    /// The initial mode: waiting for the spatial index to be built
-    WaitBuildIndex,
+    /// Wait for a specific partition's index. The boolean denotes whether 
this stream should kick
+    /// off building the index (`true`) or simply wait for someone else to 
build it (`false`).
+    WaitBuildIndex(u32, bool),
     /// Indicates that build-side has been collected, and stream is ready for
-    /// fetching probe-side
-    FetchProbeBatch,
+    /// fetching probe-side batches
+    FetchProbeBatch(PartitionDescriptor),
     /// Indicates that we're processing a probe batch using the batch iterator
     ProcessProbeBatch(
+        PartitionDescriptor,
         BoxFuture<'static, (Box<SpatialJoinBatchIterator>, 
Result<Option<RecordBatch>>)>,
     ),
-    /// Indicates that probe-side has been fully processed
-    ExhaustedProbeSide,
+    /// Indicates that we have exhausted the current probe stream, move to the 
Multi partition
+    /// or prepare for emitting unmatched build batch
+    ExhaustedProbeStream(PartitionDescriptor),
+    /// Indicates that probe-side has been fully processed, prepare iterator 
for producing
+    /// unmatched build side batches for outer join
+    PrepareUnmatchedBuildBatch(PartitionDescriptor),
     /// Indicates that we're processing unmatched build-side batches using an 
iterator
-    ProcessUnmatchedBuildBatch(UnmatchedBuildBatchIterator),
+    ProcessUnmatchedBuildBatch(PartitionDescriptor, 
UnmatchedBuildBatchIterator),
+    /// Prepare for processing the next partition.
+    /// If the last partition has been processed, simply transfer to 
[`SpatialJoinStreamState::Completed`];
+    /// If the there's still more partitions to process, then transfer to 
[`SpatialJoinStreamState::WaitBuildIndex`] state.
+    /// If we are the last one finishing processing the current partition, we 
can safely
+    /// drop the current index and kick off the building of the index for the 
next partition.
+    PrepareForNextPartition(u32, bool),
     /// Indicates that SpatialJoinStream execution is completed
     Completed,
 }
 
+impl std::fmt::Debug for SpatialJoinStreamState {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::WaitPrepareSpatialJoinComponents => write!(f, 
"WaitPrepareSpatialJoinComponents"),
+            Self::WaitBuildIndex(id, build) => f
+                .debug_tuple("WaitBuildIndex")
+                .field(id)
+                .field(build)
+                .finish(),
+            Self::FetchProbeBatch(desc) => 
f.debug_tuple("FetchProbeBatch").field(desc).finish(),
+            Self::ProcessProbeBatch(desc, _) => {
+                f.debug_tuple("ProcessProbeBatch").field(desc).finish()
+            }
+            Self::ExhaustedProbeStream(desc) => {
+                f.debug_tuple("ExhaustedProbeStream").field(desc).finish()
+            }
+            Self::PrepareUnmatchedBuildBatch(desc) => f
+                .debug_tuple("PrepareUnmatchedBuildBatch")
+                .field(desc)
+                .finish(),
+            Self::ProcessUnmatchedBuildBatch(desc, iter) => f
+                .debug_tuple("ProcessUnmatchedBuildBatch")
+                .field(desc)
+                .field(iter)
+                .finish(),
+            Self::PrepareForNextPartition(id, last) => f
+                .debug_tuple("PrepareForNextPartition")
+                .field(id)
+                .field(last)
+                .finish(),
+            Self::Completed => write!(f, "Completed"),
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct PartitionDescriptor {
+    partition_id: u32,
+    partition: SpatialPartition,
+}
+
+impl PartitionDescriptor {
+    fn regular(partition_id: u32) -> Self {
+        Self {
+            partition_id,
+            partition: SpatialPartition::Regular(partition_id),
+        }
+    }
+
+    fn multi(partition_id: u32) -> Self {
+        Self {
+            partition_id,
+            partition: SpatialPartition::Multi,
+        }

Review Comment:
   The `PartitionDescriptor` struct contains both `partition_id` and 
`partition` where `partition` can be `Regular(partition_id)`. This creates 
redundancy and potential inconsistency. Consider deriving `partition_id` from 
`partition` when needed, or ensuring these fields are always synchronized.
   ```suggestion
       fn new(partition_id: u32, partition: SpatialPartition) -> Self {
           if let SpatialPartition::Regular(inner_id) = partition {
               debug_assert_eq!(
                   partition_id, inner_id,
                   "PartitionDescriptor: partition_id ({}) must match inner id 
in SpatialPartition::Regular ({})",
                   partition_id, inner_id
               );
           }
           Self {
               partition_id,
               partition,
           }
       }
   
       fn regular(partition_id: u32) -> Self {
           Self::new(partition_id, SpatialPartition::Regular(partition_id))
       }
   
       fn multi(partition_id: u32) -> Self {
           Self::new(partition_id, SpatialPartition::Multi)
   ```



##########
rust/sedona-spatial-join/src/probe/partitioned_stream_provider.rs:
##########
@@ -0,0 +1,500 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::DerefMut;
+use std::sync::Arc;
+
+use arrow_schema::SchemaRef;
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_execution::runtime_env::RuntimeEnv;
+use parking_lot::Mutex;
+
+use crate::probe::first_pass_stream::FirstPassStream;
+use crate::probe::non_partitioned_stream::NonPartitionedStream;
+use crate::probe::ProbeStreamMetrics;
+use crate::{
+    evaluated_batch::evaluated_batch_stream::{
+        external::ExternalEvaluatedBatchStream, SendableEvaluatedBatchStream,
+    },
+    partitioning::{
+        stream_repartitioner::{SpilledPartitions, StreamRepartitioner},
+        PartitionedSide, SpatialPartition, SpatialPartitioner,
+    },
+};
+
+#[derive(Clone)]
+pub(crate) struct ProbeStreamOptions {
+    pub partitioner: Option<Arc<dyn SpatialPartitioner>>,
+    pub target_batch_rows: usize,
+    pub spill_compression: SpillCompression,
+    pub buffer_bytes_threshold: usize,
+    pub spilled_batch_in_memory_size_threshold: Option<usize>,
+}
+
+pub(crate) struct PartitionedProbeStreamProvider {
+    state: Arc<Mutex<ProbeStreamState>>,
+    runtime_env: Arc<RuntimeEnv>,
+    options: ProbeStreamOptions,
+    schema: SchemaRef,
+    metrics: ProbeStreamMetrics,
+}
+
+enum ProbeStreamState {
+    Pending {
+        source: SendableEvaluatedBatchStream,
+    },
+    FirstPass,
+    SubsequentPass {
+        manifest: ProbePartitionManifest,
+    },
+    NonPartitionedConsumed,
+    Failed(Arc<DataFusionError>),
+}
+
+impl PartitionedProbeStreamProvider {
+    pub fn new(
+        runtime_env: Arc<RuntimeEnv>,
+        options: ProbeStreamOptions,
+        source: SendableEvaluatedBatchStream,
+        metrics: ProbeStreamMetrics,
+    ) -> Self {
+        let schema = source.schema();
+        Self {
+            state: Arc::new(Mutex::new(ProbeStreamState::Pending { source })),
+            runtime_env,
+            options,
+            schema,
+            metrics,
+        }
+    }
+
+    pub fn stream_for(&self, partition: SpatialPartition) -> 
Result<SendableEvaluatedBatchStream> {
+        match partition {
+            SpatialPartition::None => Err(DataFusionError::Execution(
+                "SpatialPartition::None should be handled via outer join 
logic".into(),
+            )),
+            SpatialPartition::Regular(0) => self.first_pass_stream(),
+            SpatialPartition::Regular(_) | SpatialPartition::Multi => {
+                if self.options.partitioner.is_none() {
+                    Err(DataFusionError::Execution(
+                        "Non-partitioned probe stream only supports 
Regular(0)".into(),
+                    ))
+                } else {
+                    self.subsequent_pass_stream(partition)
+                }
+            }
+        }
+    }
+
+    fn first_pass_stream(&self) -> Result<SendableEvaluatedBatchStream> {
+        if self.options.partitioner.is_none() {
+            return self.non_partitioned_first_pass_stream();
+        }
+
+        let schema = Arc::clone(&self.schema);
+        let mut state_guard = self.state.lock();
+        match std::mem::replace(&mut *state_guard, 
ProbeStreamState::FirstPass) {
+            ProbeStreamState::Pending { source } => {
+                let partitioner = Arc::clone(
+                    self.options
+                        .partitioner
+                        .as_ref()
+                        .expect("Partitioned first pass requires a 
partitioner"),
+                );
+                let repartitioner = StreamRepartitioner::builder(
+                    Arc::clone(&self.runtime_env),
+                    Arc::clone(&partitioner),
+                    PartitionedSide::ProbeSide,
+                    self.metrics.spill_metrics.clone(),
+                )
+                .spill_compression(self.options.spill_compression)
+                .buffer_bytes_threshold(self.options.buffer_bytes_threshold)
+                .target_batch_size(self.options.target_batch_rows)
+                .spilled_batch_in_memory_size_threshold(
+                    self.options.spilled_batch_in_memory_size_threshold,
+                )
+                .build();
+
+                let state = Arc::clone(&self.state);
+                let callback = move |res: Result<SpilledPartitions>| {
+                    let mut guard = state.lock();
+                    *guard = match res {
+                        Ok(mut spills) => {
+                            let mut s = String::new();
+                            if spills.debug_print(&mut s).is_ok() {
+                                log::debug!("Probe side spilled 
partitions:\n{}", s);
+                            }
+
+                            // Sanity check: Regular(0) and None should be 
empty
+                            let mut check_empty = |partition: 
SpatialPartition| -> Result<()> {
+                                let spilled = 
spills.take_spilled_partition(partition)?;
+                                if !spilled.into_spill_files().is_empty() {
+                                    return 
Err(DataFusionError::Execution(format!(
+                                        "{:?} partition should not have 
spilled data",
+                                        partition
+                                    )));
+                                }
+                                Ok(())
+                            };
+
+                            match check_empty(SpatialPartition::Regular(0))
+                                .and_then(|_| 
check_empty(SpatialPartition::None))
+                            {
+                                Ok(_) => ProbeStreamState::SubsequentPass {
+                                    manifest: 
ProbePartitionManifest::new(schema, spills),
+                                },
+                                Err(err) => 
ProbeStreamState::Failed(Arc::new(err)),
+                            }
+                        }
+                        Err(err) => {
+                            let err_arc = Arc::new(err);
+                            ProbeStreamState::Failed(Arc::clone(&err_arc))
+                        }
+                    };
+                    Ok(())
+                };
+
+                let first_pass = FirstPassStream::new(
+                    source,
+                    repartitioner,
+                    partitioner,
+                    self.metrics.clone(),
+                    callback,
+                );
+                Ok(Box::pin(first_pass))
+            }
+            ProbeStreamState::FirstPass => Err(DataFusionError::Execution(
+                "First pass already running for partitioned probe 
stream".into(),
+            )),
+            ProbeStreamState::SubsequentPass { .. } => 
Err(DataFusionError::Execution(
+                "First pass already completed".into(),
+            )),
+            ProbeStreamState::NonPartitionedConsumed => 
Err(DataFusionError::Execution(
+                "Non-partitioned probe stream already consumed".into(),
+            )),
+            ProbeStreamState::Failed(err) => Err(DataFusionError::Shared(err)),
+        }
+    }
+
+    fn non_partitioned_first_pass_stream(&self) -> 
Result<SendableEvaluatedBatchStream> {
+        let mut state_guard = self.state.lock();
+        match std::mem::replace(&mut *state_guard, 
ProbeStreamState::NonPartitionedConsumed) {
+            ProbeStreamState::Pending { source } => 
Ok(Box::pin(NonPartitionedStream::new(
+                source,
+                self.metrics.clone(),
+            ))),
+            ProbeStreamState::NonPartitionedConsumed => 
Err(DataFusionError::Execution(
+                "Non-partitioned probe stream already consumed".into(),
+            )),
+            ProbeStreamState::Failed(err) => Err(DataFusionError::Shared(err)),
+            _ => Err(DataFusionError::Execution(
+                "Non-partitioned probe stream is not available".into(),
+            )),
+        }
+    }
+
+    fn subsequent_pass_stream(
+        &self,
+        partition: SpatialPartition,
+    ) -> Result<SendableEvaluatedBatchStream> {
+        if self.options.partitioner.is_none() {
+            return Err(DataFusionError::Execution(
+                "Non-partitioned probe stream cannot serve additional 
partitions".into(),
+            ));
+        }
+        let mut locked = self.state.lock();
+        let manifest = match locked.deref_mut() {
+            ProbeStreamState::SubsequentPass { manifest } => manifest,
+            ProbeStreamState::Failed(err) => return 
Err(DataFusionError::Shared(Arc::clone(err))),
+            _ => {
+                return Err(DataFusionError::Execution(
+                    "Partitioned probe stream warm-up not finished".into(),
+                ))
+            }
+        };
+
+        {
+            // let mut manifest = manifest.lock();
+            manifest.stream_for(partition)
+        }
+    }
+
+    pub fn get_partition_row_count(&self, partition: SpatialPartition) -> 
Result<usize> {
+        let mut locked = self.state.lock();
+        let manifest = match locked.deref_mut() {
+            ProbeStreamState::SubsequentPass { manifest } => manifest,
+            ProbeStreamState::Failed(err) => return 
Err(DataFusionError::Shared(Arc::clone(err))),
+            _ => {
+                return Err(DataFusionError::Execution(
+                    "Partitioned probe stream warm-up not finished".into(),
+                ))
+            }
+        };
+        manifest.get_partition_row_count(partition)
+    }
+}
+
+pub struct ProbePartitionManifest {
+    schema: SchemaRef,
+    slots: SpilledPartitions,
+}
+
+impl ProbePartitionManifest {
+    fn new(schema: SchemaRef, spills: SpilledPartitions) -> Self {
+        Self {
+            schema,
+            slots: spills,
+        }
+    }
+
+    fn get_partition_row_count(&self, partition: SpatialPartition) -> 
Result<usize> {
+        let spilled = self.slots.get_spilled_partition(partition)?;
+        Ok(spilled.num_rows())
+    }
+
+    fn stream_for(&mut self, partition: SpatialPartition) -> 
Result<SendableEvaluatedBatchStream> {
+        match partition {
+            SpatialPartition::Regular(0) => Err(DataFusionError::Execution(
+                "Partition 0 is only available during the first pass".into(),
+            )),
+            SpatialPartition::None => Err(DataFusionError::Execution(
+                "SpatialPartition::None should not request a probe 
stream".into(),
+            )),
+            SpatialPartition::Regular(_) => {
+                let spilled = self.slots.take_spilled_partition(partition)?;
+                Ok(Box::pin(
+                    ExternalEvaluatedBatchStream::try_from_spill_files(
+                        Arc::clone(&self.schema),
+                        spilled.into_spill_files(),
+                    )?,
+                ))
+            }
+            SpatialPartition::Multi => {
+                let spilled = self.slots.get_spilled_partition(partition)?;
+                Ok(Box::pin(
+                    ExternalEvaluatedBatchStream::try_from_spill_files(
+                        Arc::clone(&self.schema),
+                        spilled.into_spill_files(),
+                    )?,
+                ))
+            }

Review Comment:
   For the Multi partition, `get_spilled_partition` is used instead of 
`take_spilled_partition`, which allows the Multi partition to be consumed 
multiple times (as shown in tests). However, calling `into_spill_files()` on a 
reference consumes the underlying data. This appears to work because 
`get_spilled_partition` likely returns an owned value, but the asymmetry with 
the Regular partition handling (which uses `take_spilled_partition`) could lead 
to confusion or bugs. Verify that this implementation correctly supports 
multiple consumptions of the Multi partition.



##########
rust/sedona-spatial-join/src/exec.rs:
##########
@@ -627,7 +609,8 @@ impl SpatialJoinExec {
 
 #[cfg(test)]
 mod tests {
-    use arrow_array::{Array, RecordBatch};
+    use arrow_array::Array;
+    use arrow_array::RecordBatch;

Review Comment:
   Multiple imports from the same crate should be combined into a single use 
statement with braces for consistency and readability.



##########
rust/sedona-spatial-join/src/probe/first_pass_stream.rs:
##########
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    collections::VecDeque,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use datafusion_common::{DataFusionError, Result};
+use futures::{Stream, StreamExt};
+use sedona_common::sedona_internal_err;
+
+use crate::probe::ProbeStreamMetrics;
+use crate::{
+    evaluated_batch::{
+        evaluated_batch_stream::{EvaluatedBatchStream, 
SendableEvaluatedBatchStream},
+        EvaluatedBatch,
+    },
+    partitioning::{
+        stream_repartitioner::{
+            assign_rows, interleave_evaluated_batch, SpilledPartitions, 
StreamRepartitioner,
+        },
+        PartitionedSide, SpatialPartition, SpatialPartitioner,
+    },
+};
+
+pub(crate) trait FirstPassStreamCallback {
+    fn call(self, result: Result<SpilledPartitions>) -> Result<()>;
+}
+
+impl<F: FnOnce(Result<SpilledPartitions>) -> Result<()>> 
FirstPassStreamCallback for F {
+    fn call(self, result: Result<SpilledPartitions>) -> Result<()> {
+        self(result)
+    }
+}
+
+pub(crate) struct FirstPassStream<C: FirstPassStreamCallback> {
+    source: SendableEvaluatedBatchStream,
+    repartitioner: Option<StreamRepartitioner>,
+    partitioner: Arc<dyn SpatialPartitioner>,
+    pending_output: VecDeque<Result<EvaluatedBatch>>,
+    metrics: ProbeStreamMetrics,
+    callback: Option<C>,
+}
+
+impl<C: FirstPassStreamCallback> FirstPassStream<C> {
+    pub fn new(
+        source: SendableEvaluatedBatchStream,
+        repartitioner: StreamRepartitioner,
+        partitioner: Arc<dyn SpatialPartitioner>,
+        metrics: ProbeStreamMetrics,
+        callback: C,
+    ) -> Self {
+        Self {
+            source,
+            repartitioner: Some(repartitioner),
+            partitioner,
+            pending_output: VecDeque::new(),
+            metrics,
+            callback: Some(callback),
+        }
+    }
+
+    fn finish_first_pass(&mut self) -> Result<()> {
+        let repartitioner = self.repartitioner.take().ok_or_else(|| {
+            DataFusionError::Internal("First pass repartitioner already 
finished".into())
+        })?;
+        let parts = repartitioner.finish()?;
+        let callback_opt = self.callback.take();
+        match callback_opt {
+            Some(callback) => callback.call(Ok(parts)),
+            None => sedona_internal_err!("Callback has already been called"),
+        }
+    }
+
+    fn transition_to_failed(&mut self, err: DataFusionError) -> 
DataFusionError {
+        let err_arc = Arc::new(err);
+        let callback_opt = self.callback.take();
+        if let Some(callback) = callback_opt {
+            callback
+                .call(Err(DataFusionError::Shared(err_arc.clone())))
+                .ok();

Review Comment:
   The error returned by `callback.call()` is silently discarded with `.ok()`. 
If the callback fails to propagate the error state, this could lead to silent 
failures. Consider logging the callback error or combining it with the original 
error.
   ```suggestion
               if let Err(callback_err) =
                   callback.call(Err(DataFusionError::Shared(err_arc.clone())))
               {
                   eprintln!(
                       "FirstPassStream callback failed while handling error: 
{:?}",
                       callback_err
                   );
               }
   ```



##########
rust/sedona-spatial-join/src/prepare.rs:
##########
@@ -316,6 +292,30 @@ impl SpatialJoinComponentsBuilder {
         Ok(build_partitioner)
     }
 
+    /// Construct a `SpatialPartitioner` (e.g. Flat) from the statistics of 
partitioned build
+    /// side for partitioning the probe side.
+    fn create_spatial_partitioner_for_probe_side(
+        &self,
+        num_partitions: usize,
+        merged_spilled_partitions: &SpilledPartitions,
+    ) -> Result<Arc<dyn SpatialPartitioner>> {
+        let probe_partitioner: Arc<dyn SpatialPartitioner> = {
+            // Build a flat partitioner using these partitions
+            let mut partition_bounds = Vec::with_capacity(num_partitions);
+            for k in 0..num_partitions {
+                let partition = SpatialPartition::Regular(k as u32);
+                let partition_bound = merged_spilled_partitions
+                    .spilled_partition(partition)?
+                    .bounding_box()
+                    .cloned()
+                    .unwrap_or(BoundingBox::empty());
+                partition_bounds.push(partition_bound);
+            }

Review Comment:
   The loop calls `spilled_partition()` for each partition, which may involve 
lookups. Consider collecting all partition bounds in a single pass if the 
underlying data structure supports batch access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to