Copilot commented on code in PR #527:
URL: https://github.com/apache/sedona-db/pull/527#discussion_r2704356198


##########
rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs:
##########
@@ -0,0 +1,1350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Streaming spatial partitioning utilities.
+//!
+//! This module provides helpers that repartition an [`EvaluatedBatch`] stream 
into
+//! spatial spill files using a [`SpatialPartitioner`]. Each regular 
partition, along
+//! with the None and Multi partitions, gets at most one spill file which can
+//! later be replayed via [`SendableEvaluatedBatchStream`].
+
+use std::sync::Arc;
+
+use crate::{
+    evaluated_batch::{
+        evaluated_batch_stream::SendableEvaluatedBatchStream, 
spill::EvaluatedBatchSpillWriter,
+        EvaluatedBatch,
+    },
+    operand_evaluator::EvaluatedGeometryArray,
+    partitioning::{
+        partition_slots::PartitionSlots, util::geo_rect_to_bbox, 
PartitionedSide, SpatialPartition,
+        SpatialPartitioner,
+    },
+};
+use arrow::compute::interleave as arrow_interleave;
+use arrow::compute::interleave_record_batch;
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use datafusion::config::SpillCompression;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::SpillMetrics;
+use futures::StreamExt;
+use sedona_common::sedona_internal_err;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
+use sedona_geometry::bounding_box::BoundingBox;
+use sedona_geometry::interval::IntervalTrait;
+use sedona_schema::datatypes::WKB_GEOMETRY;
+
+/// Result emitted after a stream is spatially repartitioned.
+#[derive(Debug)]
+pub struct SpilledPartitions {
+    slots: PartitionSlots,
+    partitions: Vec<Option<SpilledPartition>>,
+}
+
+/// Spill metadata captured for a single spatial partition.
+#[derive(Debug, Clone)]
+pub struct SpilledPartition {
+    spill_files: Vec<Arc<RefCountedTempFile>>,
+    geo_statistics: GeoStatistics,

Review Comment:
   The `SpilledPartition` struct should have a doc comment explaining its 
purpose and fields, especially since it's a public API type. Consider adding a 
summary that describes how this type represents metadata and files for a single 
spatial partition after spilling.
   ```suggestion
   /// Metadata and spill files produced for a single spatial partition.
   ///
   /// A `SpilledPartition` corresponds to one logical spatial partition 
(including
   /// special partitions such as `None` or `Multi`) after the stream 
repartitioner
   /// has flushed in-memory data to disk. It tracks the set of temporary spill
   /// files that hold the partition's rows, along with aggregated geospatial
   /// statistics and the total number of rows written.
   #[derive(Debug, Clone)]
   pub struct SpilledPartition {
       /// Temporary spill files containing the rows assigned to this partition.
       spill_files: Vec<Arc<RefCountedTempFile>>,
       /// Aggregated geospatial statistics computed over all rows in this 
partition.
       geo_statistics: GeoStatistics,
       /// Total number of rows that were written into `spill_files`.
   ```



##########
rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs:
##########
@@ -0,0 +1,1350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Streaming spatial partitioning utilities.
+//!
+//! This module provides helpers that repartition an [`EvaluatedBatch`] stream 
into
+//! spatial spill files using a [`SpatialPartitioner`]. Each regular 
partition, along
+//! with the None and Multi partitions, gets at most one spill file which can
+//! later be replayed via [`SendableEvaluatedBatchStream`].
+
+use std::sync::Arc;
+
+use crate::{
+    evaluated_batch::{
+        evaluated_batch_stream::SendableEvaluatedBatchStream, 
spill::EvaluatedBatchSpillWriter,
+        EvaluatedBatch,
+    },
+    operand_evaluator::EvaluatedGeometryArray,
+    partitioning::{
+        partition_slots::PartitionSlots, util::geo_rect_to_bbox, 
PartitionedSide, SpatialPartition,
+        SpatialPartitioner,
+    },
+};
+use arrow::compute::interleave as arrow_interleave;
+use arrow::compute::interleave_record_batch;
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use datafusion::config::SpillCompression;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_execution::{disk_manager::RefCountedTempFile, 
runtime_env::RuntimeEnv};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::SpillMetrics;
+use futures::StreamExt;
+use sedona_common::sedona_internal_err;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
+use sedona_geometry::bounding_box::BoundingBox;
+use sedona_geometry::interval::IntervalTrait;
+use sedona_schema::datatypes::WKB_GEOMETRY;
+
+/// Result emitted after a stream is spatially repartitioned.
+#[derive(Debug)]
+pub struct SpilledPartitions {
+    slots: PartitionSlots,
+    partitions: Vec<Option<SpilledPartition>>,
+}
+
+/// Spill metadata captured for a single spatial partition.
+#[derive(Debug, Clone)]
+pub struct SpilledPartition {
+    spill_files: Vec<Arc<RefCountedTempFile>>,
+    geo_statistics: GeoStatistics,
+    num_rows: usize,
+}
+
+impl SpilledPartition {
+    /// Construct a spilled partition from finalized spill files and 
aggregated statistics.
+    pub fn new(
+        spill_files: Vec<Arc<RefCountedTempFile>>,
+        geo_statistics: GeoStatistics,
+        num_rows: usize,
+    ) -> Self {
+        Self {
+            spill_files,
+            geo_statistics,
+            num_rows,
+        }
+    }
+
+    /// Create an empty spilled partition (no files, empty stats, zero rows).
+    pub fn empty() -> Self {
+        Self::new(Vec::new(), GeoStatistics::empty(), 0)
+    }
+
+    /// Spill files produced for this partition.
+    pub fn spill_files(&self) -> &[Arc<RefCountedTempFile>] {
+        &self.spill_files
+    }
+
+    /// Aggregated geospatial statistics for this partition.
+    pub fn geo_statistics(&self) -> &GeoStatistics {
+        &self.geo_statistics
+    }
+
+    /// Total number of rows assigned to this partition.
+    pub fn num_rows(&self) -> usize {
+        self.num_rows
+    }
+
+    /// Bounding box, if available from accumulated statistics.
+    pub fn bounding_box(&self) -> Option<&BoundingBox> {
+        self.geo_statistics.bbox()
+    }
+
+    /// Consume this value and return only the spill files.
+    pub fn into_spill_files(self) -> Vec<Arc<RefCountedTempFile>> {
+        self.spill_files
+    }
+
+    /// Consume this value and return `(spill_files, geo_statistics, 
num_rows)`.
+    pub fn into_inner(self) -> (Vec<Arc<RefCountedTempFile>>, GeoStatistics, 
usize) {
+        (self.spill_files, self.geo_statistics, self.num_rows)
+    }
+}
+
+impl SpilledPartitions {
+    /// Construct a new spilled-partitions container for the provided slots.
+    ///
+    /// `partitions` must contain one entry for every slot in `slots`.
+    pub fn new(slots: PartitionSlots, partitions: Vec<SpilledPartition>) -> 
Self {
+        assert_eq!(partitions.len(), slots.total_slots());
+        let partitions = partitions.into_iter().map(Some).collect();
+        Self { slots, partitions }
+    }
+
+    /// Number of regular partitions
+    pub fn num_regular_partitions(&self) -> usize {
+        self.slots.num_regular_partitions()
+    }
+
+    /// Get the slots for mapping spatial partitions to sequential 0-based 
indexes
+    pub fn slots(&self) -> PartitionSlots {
+        self.slots
+    }
+
+    /// Count of spill files that were actually materialized.
+    pub fn spill_file_count(&self) -> usize {
+        self.partitions
+            .iter()
+            .map(|partition| partition.as_ref().map_or(0, |p| 
p.spill_files().len()))
+            .sum()
+    }
+
+    /// Retrieve the spilled partition for a given spatial partition.
+    pub fn spilled_partition(&self, partition: SpatialPartition) -> 
Result<&SpilledPartition> {
+        let Some(slot) = self.slots.slot(partition) else {
+            return sedona_internal_err!(
+                "Invalid partition {:?} for {} regular partitions",
+                partition,
+                self.slots.num_regular_partitions()
+            );
+        };
+        match &self.partitions[slot] {
+            Some(spilled_partition) => Ok(spilled_partition),
+            None => sedona_internal_err!(
+                "Spilled partition {:?} has already been taken away",
+                partition
+            ),
+        }
+    }
+
+    /// Consume this structure into concrete spilled partitions.
+    pub fn into_spilled_partitions(self) -> Result<Vec<SpilledPartition>> {
+        let mut partitions = Vec::with_capacity(self.partitions.len());
+        for partition_opt in self.partitions {
+            match partition_opt {
+                Some(partition) => partitions.push(partition),
+                None => {
+                    return sedona_internal_err!(
+                        "Some of the spilled partitions have already been 
taken away"
+                    )
+                }
+            }
+        }
+        Ok(partitions)
+    }
+
+    /// Get a clone of the spill files in specified partition without 
consuming it. This is
+    /// mainly for retrieving the Multi partition, which may be scanned 
multiple times.
+    pub fn get_spilled_partition(&self, partition: SpatialPartition) -> 
Result<SpilledPartition> {
+        let Some(slot) = self.slots.slot(partition) else {
+            return sedona_internal_err!(
+                "Invalid partition {:?} for {} regular partitions",
+                partition,
+                self.slots.num_regular_partitions()
+            );
+        };
+        match &self.partitions[slot] {
+            Some(spilled_partition) => Ok(spilled_partition.clone()),
+            None => sedona_internal_err!(
+                "Spilled partition {:?} has already been taken away",
+                partition
+            ),
+        }
+    }
+
+    /// Take the spill files in specified partition from it without consuming 
this value. This is
+    /// mainly for retrieving the regular partitions, which will only be 
scanned once.
+    pub fn take_spilled_partition(
+        &mut self,
+        partition: SpatialPartition,
+    ) -> Result<SpilledPartition> {
+        let Some(slot) = self.slots.slot(partition) else {
+            return sedona_internal_err!(
+                "Invalid partition {:?} for {} regular partitions",
+                partition,
+                self.slots.num_regular_partitions()
+            );
+        };
+        match std::mem::take(&mut self.partitions[slot]) {
+            Some(spilled_partition) => Ok(spilled_partition),
+            None => sedona_internal_err!(
+                "Spilled partition {:?} has already been taken away",
+                partition
+            ),
+        }
+    }
+
+    /// Is the spill files still present and can be taken away

Review Comment:
   Corrected grammar from 'Is the spill files still present' to 'Are the spill 
files still present'.
   ```suggestion
       /// Are the spill files still present and can they be taken away?
   ```



##########
rust/sedona-spatial-join/bench/partitioning/stream_repartitioner.rs:
##########
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{
+    atomic::{AtomicU64, Ordering},
+    Arc,
+};
+use std::time::Duration;
+
+use arrow_array::{
+    ArrayRef, BinaryArray, Date32Array, Int64Array, RecordBatch, StringArray,
+    TimestampMicrosecondArray,
+};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use criterion::{criterion_group, criterion_main, BatchSize, Criterion, 
Throughput};
+use datafusion::config::SpillCompression;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
+use futures::executor::block_on;
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use sedona_geometry::{bounding_box::BoundingBox, interval::IntervalTrait};
+use sedona_schema::datatypes::WKB_GEOMETRY;
+use sedona_spatial_join::evaluated_batch::{
+    evaluated_batch_stream::{in_mem::InMemoryEvaluatedBatchStream, 
SendableEvaluatedBatchStream},
+    EvaluatedBatch,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::partitioning::PartitionedSide;
+use sedona_spatial_join::partitioning::{
+    kdb::KDBPartitioner, stream_repartitioner::StreamRepartitioner, 
SpatialPartitioner,
+};
+
+const RNG_SEED: u64 = 0x05ED_04A5;
+const NUM_BATCHES: usize = 50;
+const ROWS_PER_BATCH: usize = 8192;
+const SAMPLE_FOR_PARTITIONER: usize = 1_000;
+const MAX_ITEMS_PER_NODE: usize = 128;
+const MAX_LEVELS: usize = 4;
+const REPARTITIONER_BUFFER_BYTES: usize = 8 * 1024 * 1024;
+
+fn bench_stream_partitioner(c: &mut Criterion) {
+    let extent = Arc::new(default_extent());
+    let partitioner = build_partitioner(extent.as_ref());
+    let schema = Arc::new(build_schema());
+    let runtime_env = Arc::new(RuntimeEnv::default());
+    let metrics_set = ExecutionPlanMetricsSet::new();
+    let spill_metrics = SpillMetrics::new(&metrics_set, 0);
+    let seed_counter = Arc::new(AtomicU64::new(RNG_SEED));
+
+    let mut group = c.benchmark_group("stream_partitioner_repartition");
+    group.throughput(Throughput::Elements((NUM_BATCHES * ROWS_PER_BATCH) as 
u64));
+
+    group.bench_function("kdb_repartition", |b| {
+        let seed_counter = Arc::clone(&seed_counter);
+        let schema = Arc::clone(&schema);
+        let runtime_env = Arc::clone(&runtime_env);
+        let partitioner = Arc::clone(&partitioner);
+        let spill_metrics = spill_metrics.clone();
+        let extent = Arc::clone(&extent);
+
+        b.iter_batched(
+            move || {
+                let seed = seed_counter.fetch_add(1, Ordering::Relaxed);
+                generate_stream(seed, schema.clone(), extent.as_ref())
+            },
+            move |stream| {
+                block_on(async {
+                    StreamRepartitioner::builder(
+                        runtime_env.clone(),
+                        partitioner.clone(),
+                        PartitionedSide::BuildSide,
+                        spill_metrics.clone(),
+                    )
+                    .spill_compression(SpillCompression::Uncompressed)
+                    .buffer_bytes_threshold(REPARTITIONER_BUFFER_BYTES)
+                    .target_batch_size(ROWS_PER_BATCH)
+                    .spilled_batch_in_memory_size_threshold(None)
+                    .build()
+                    .repartition_stream(stream)
+                    .await
+                    .expect("repartition should succeed in benchmark");
+                });
+            },
+            BatchSize::SmallInput,
+        );
+    });
+
+    group.finish();
+}
+
+fn generate_stream(
+    seed: u64,
+    schema: Arc<Schema>,
+    extent: &BoundingBox,
+) -> SendableEvaluatedBatchStream {
+    let mut rng = StdRng::seed_from_u64(seed);
+    let mut batches = Vec::with_capacity(NUM_BATCHES);
+    for _ in 0..NUM_BATCHES {
+        batches.push(random_evaluated_batch(
+            schema.clone(),
+            ROWS_PER_BATCH,
+            extent,
+            &mut rng,
+        ));
+    }
+    in_memory_stream(schema, batches)
+}
+
+fn random_evaluated_batch(
+    schema: Arc<Schema>,
+    rows: usize,
+    extent: &BoundingBox,
+    rng: &mut StdRng,
+) -> EvaluatedBatch {
+    let batch = random_record_batch(schema, rows, rng);
+    let geom_array = random_geometry_array(rows, extent, rng);
+    let geom_array = EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY)
+        .expect("geometry array allocation should succeed");
+    EvaluatedBatch { batch, geom_array }
+}
+
+fn random_record_batch(schema: Arc<Schema>, rows: usize, rng: &mut StdRng) -> 
RecordBatch {
+    let ids =
+        Int64Array::from_iter_values((0..rows).map(|_| 
rng.random_range(0..1_000_000) as i64));

Review Comment:
   The cast to `i64` is unnecessary since `random_range(0..1_000_000)` already 
returns a value that can be used directly in the range. Consider using 
`rng.gen_range(0..1_000_000_i64)` instead to avoid the cast.
   ```suggestion
           Int64Array::from_iter_values((0..rows).map(|_| 
rng.random_range(0..1_000_000_i64)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to