paleolimbot commented on code in PR #555: URL: https://github.com/apache/sedona-db/pull/555#discussion_r2734539491
########## rust/sedona-spatial-join/src/prepare.rs: ########## @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{mem, sync::Arc}; + +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::{ + disk_manager::RefCountedTempFile, memory_pool::MemoryConsumer, SendableRecordBatchStream, + TaskContext, +}; +use datafusion_expr::JoinType; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use fastrand::Rng; +use sedona_common::{sedona_internal_err, NumSpatialPartitionsConfig, SedonaOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_geometry::bounding_box::BoundingBox; + +use crate::{ + index::{ + memory_plan::{compute_memory_plan, PartitionMemorySummary}, + partitioned_index_provider::PartitionedIndexProvider, + BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialJoinBuildMetrics, + }, + partitioning::{ + kdb::KDBPartitioner, + stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner}, + PartitionedSide, SpatialPartition, SpatialPartitioner, + }, + spatial_predicate::SpatialPredicate, + utils::bbox_sampler::BoundingBoxSamples, +}; + +pub(crate) struct SpatialJoinComponents { + pub partitioned_index_provider: Arc<PartitionedIndexProvider>, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn prepare_spatial_join_components( + context: Arc<TaskContext>, + build_schema: SchemaRef, + build_streams: Vec<SendableRecordBatchStream>, + spatial_predicate: SpatialPredicate, + join_type: JoinType, + probe_threads_count: usize, + metrics: ExecutionPlanMetricsSet, + seed: u64, +) -> Result<SpatialJoinComponents> { + let session_config = context.session_config(); + let target_batch_size = session_config.batch_size(); + let sedona_options = session_config + .options() + .extensions + .get::<SedonaOptions>() + .cloned() + .unwrap_or_default(); Review Comment: If it is easy to avoid it may be a good idea to not clone all of the SedonaOptions (lest some day it contain something large). ########## rust/sedona-spatial-join/src/prepare.rs: ########## @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{mem, sync::Arc}; + +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::{ + disk_manager::RefCountedTempFile, memory_pool::MemoryConsumer, SendableRecordBatchStream, + TaskContext, +}; +use datafusion_expr::JoinType; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use fastrand::Rng; +use sedona_common::{sedona_internal_err, NumSpatialPartitionsConfig, SedonaOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_geometry::bounding_box::BoundingBox; + +use crate::{ + index::{ + memory_plan::{compute_memory_plan, PartitionMemorySummary}, + partitioned_index_provider::PartitionedIndexProvider, + BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialJoinBuildMetrics, + }, + partitioning::{ + kdb::KDBPartitioner, + stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner}, + PartitionedSide, SpatialPartition, SpatialPartitioner, + }, + spatial_predicate::SpatialPredicate, + utils::bbox_sampler::BoundingBoxSamples, +}; + +pub(crate) struct SpatialJoinComponents { + pub partitioned_index_provider: Arc<PartitionedIndexProvider>, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn prepare_spatial_join_components( + context: Arc<TaskContext>, + build_schema: SchemaRef, + build_streams: Vec<SendableRecordBatchStream>, + spatial_predicate: SpatialPredicate, + join_type: JoinType, + probe_threads_count: usize, + metrics: ExecutionPlanMetricsSet, + seed: u64, +) -> Result<SpatialJoinComponents> { + let session_config = context.session_config(); + let target_batch_size = session_config.batch_size(); + let sedona_options = session_config + .options() + .extensions + .get::<SedonaOptions>() + .cloned() + .unwrap_or_default(); + let concurrent = sedona_options.spatial_join.concurrent_build_side_collection; + let spilled_batch_in_memory_size_threshold = if sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold + == 0 + { + None + } else { + Some( + sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold, + ) + }; + let spill_compression = session_config.spill_compression(); + let memory_pool = context.memory_pool(); + let num_partitions = build_streams.len(); + if num_partitions == 0 { + log::debug!("Build side has no data. Creating empty spatial index."); + let partitioned_index_provider = PartitionedIndexProvider::new_empty( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + return Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }); + } + + let runtime_env = context.runtime_env(); + let collector = BuildSideBatchesCollector::new( + spatial_predicate.clone(), + sedona_options.spatial_join.clone(), + Arc::clone(&runtime_env), + spill_compression, + ); + let mut collect_metrics_vec = Vec::with_capacity(num_partitions); + let mut reservations = Vec::with_capacity(num_partitions); + for k in 0..num_partitions { + let consumer = + MemoryConsumer::new(format!("SpatialJoinCollectBuildSide[{k}]")).with_can_spill(true); + let reservation = consumer.register(memory_pool); + reservations.push(reservation); + collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics)); + } + + let mut rng = Rng::with_seed(seed); + let mut build_partitions = collector + .collect_all( + build_streams, + reservations, + collect_metrics_vec.clone(), + concurrent, + rng.u64(0..0xFFFF), + ) + .await?; + let memory_plan = + compute_memory_plan(build_partitions.iter().map(PartitionMemorySummary::from))?; + let mut memory_plan_str = String::new(); + if memory_plan.debug_print(&mut memory_plan_str).is_ok() { + log::debug!( + "Computed memory plan for spatial join:\n{}", + memory_plan_str + ); + } + let num_partitions = match sedona_options.spatial_join.debug.num_spatial_partitions { + NumSpatialPartitionsConfig::Auto => memory_plan.num_partitions, + NumSpatialPartitionsConfig::Fixed(n) => { + log::debug!("Override number of spatial partitions to {}", n); + n + } + }; + let memory_for_intermittent_usage = match sedona_options + .spatial_join + .debug + .memory_for_intermittent_usage + { + Some(value) => { + log::debug!("Override memory for intermittent usage to {}", value); + value + } + None => memory_plan.memory_for_intermittent_usage, + }; + + if num_partitions == 1 { + log::debug!("Running single-partitioned in-memory spatial join"); + let partitioned_index_provider = PartitionedIndexProvider::new_single_partition( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + build_partitions, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }) + } else { + let build_partitioner: Arc<dyn SpatialPartitioner> = { + // Use spatial partitioners to partition the build side and the probe side, this will + // reduce the amount of work needed for probing each partitioned index. + // The KDB partitioner is built using the collected bounding box samples. + let mut bbox_samples = BoundingBoxSamples::empty(); + let mut geo_stats = GeoStatistics::empty(); + for partition in &mut build_partitions { + let samples = mem::take(&mut partition.bbox_samples); + bbox_samples = bbox_samples.combine(samples, &mut rng); + geo_stats.merge(&partition.geo_statistics); + } + + let extent = geo_stats + .bbox() + .cloned() + .unwrap_or(BoundingBox::xy((0, 0), (0, 0))); + let extent = if extent.is_empty() { + BoundingBox::xy((0, 0), (0, 0)) + } else { + extent + }; Review Comment: Do we have to assign a finite but meaningless value here? Can the KDBPartitioner accept an empty extent so that the implications of assigning `(0, 0), (0, 0)` arbitrarily are more clear? ########## rust/sedona-spatial-join/src/stream.rs: ########## @@ -213,16 +232,97 @@ impl SpatialJoinStream { } } - fn wait_build_index( + fn wait_create_spatial_join_components( &mut self, cx: &mut std::task::Context<'_>, ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> { - let index = ready!(self.once_fut_spatial_index.get_shared(cx))?; - self.spatial_index = Some(index); - self.state = SpatialJoinStreamState::FetchProbeBatch; + if self.index_provider.is_none() { + let spatial_join_components = + ready!(self.once_fut_spatial_join_components.get_shared(cx))?; + let provider = Arc::clone(&spatial_join_components.partitioned_index_provider); + self.num_regular_partitions = Some(provider.num_regular_partitions() as u32); + self.index_provider = Some(provider); + } + + let num_partitions = self + .num_regular_partitions + .expect("num_regular_partitions should be available"); + if num_partitions == 0 { + // Usually does not happen. The indexed side should have at least 1 partition. + self.state = SpatialJoinStreamState::Completed; + return Poll::Ready(Ok(StatefulStreamResult::Continue)); + } + + if num_partitions > 1 { + return Poll::Ready(sedona_internal_err!( + "Multi-partitioned spatial join is not supported yet" + )); + } + + self.state = SpatialJoinStreamState::WaitBuildIndex; Poll::Ready(Ok(StatefulStreamResult::Continue)) } + fn wait_build_index( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> { + let num_partitions = self + .num_regular_partitions + .expect("num_regular_partitions should be available"); Review Comment: Should this return a `sedona_internal_err!()`? ########## rust/sedona-spatial-join/src/stream.rs: ########## @@ -213,16 +232,97 @@ impl SpatialJoinStream { } } - fn wait_build_index( + fn wait_create_spatial_join_components( &mut self, cx: &mut std::task::Context<'_>, ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> { - let index = ready!(self.once_fut_spatial_index.get_shared(cx))?; - self.spatial_index = Some(index); - self.state = SpatialJoinStreamState::FetchProbeBatch; + if self.index_provider.is_none() { + let spatial_join_components = + ready!(self.once_fut_spatial_join_components.get_shared(cx))?; + let provider = Arc::clone(&spatial_join_components.partitioned_index_provider); + self.num_regular_partitions = Some(provider.num_regular_partitions() as u32); + self.index_provider = Some(provider); + } + + let num_partitions = self + .num_regular_partitions + .expect("num_regular_partitions should be available"); Review Comment: Should this return a `sedona_internal_err!()`? ########## rust/sedona-spatial-join/src/prepare.rs: ########## @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{mem, sync::Arc}; + +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::{ + disk_manager::RefCountedTempFile, memory_pool::MemoryConsumer, SendableRecordBatchStream, + TaskContext, +}; +use datafusion_expr::JoinType; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use fastrand::Rng; +use sedona_common::{sedona_internal_err, NumSpatialPartitionsConfig, SedonaOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_geometry::bounding_box::BoundingBox; + +use crate::{ + index::{ + memory_plan::{compute_memory_plan, PartitionMemorySummary}, + partitioned_index_provider::PartitionedIndexProvider, + BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialJoinBuildMetrics, + }, + partitioning::{ + kdb::KDBPartitioner, + stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner}, + PartitionedSide, SpatialPartition, SpatialPartitioner, + }, + spatial_predicate::SpatialPredicate, + utils::bbox_sampler::BoundingBoxSamples, +}; + +pub(crate) struct SpatialJoinComponents { + pub partitioned_index_provider: Arc<PartitionedIndexProvider>, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn prepare_spatial_join_components( + context: Arc<TaskContext>, + build_schema: SchemaRef, + build_streams: Vec<SendableRecordBatchStream>, + spatial_predicate: SpatialPredicate, + join_type: JoinType, + probe_threads_count: usize, + metrics: ExecutionPlanMetricsSet, + seed: u64, +) -> Result<SpatialJoinComponents> { Review Comment: You don't have to do this if it is too much work, but this is a very long function that does quite a lot that has a lot of arguments. Would it be cleaner to do something like: ```rust struct SpatialJoinComponentsBuilder { pub context: Arc<TaskContext>, pub build_schema: SchemaRef, pub build_streams: Vec<SendableRecordBatchStream>, pub spatial_predicate: SpatialPredicate, pub join_type: JoinType, pub probe_threads_count: usize, pub metrics: ExecutionPlanMetricsSet, pub seed: u64, } impl SpatialJoinComponentsBuilder { pub fn build(self) -> Result<SpatialJoinComponents> { let sedona_options = self.collect_options(); if self.build_streams.is_empty() { return self.empty_components(); } let (memory, stuff) = self.configure_reservations(); let build_partitions = self.collect_build_partitions().await?; if num_partitions == 1 { return self.single_partition_components(); } let build_partitioner = self.make_partitioner(); let mut partitioned_spill_files_vec = self.collect_build_side_in_parallel().await?; self.merge_spilled_partitions(&partitioned_spill_files_vec)?; self.sanity_check_partitions(); self.multi_partition_components() } } ``` Totally understandable if there's too much intermediate state between those steps that needs to be shared to split that up...what is being done here is a very tricky thing to accomplish and I get that at some point it just has to be done. In the absence of function names to help, perhaps at least some comments around what this is doing, which seems like a critical high-level piece of the out of core implementation! ########## rust/sedona-spatial-join/src/stream.rs: ########## @@ -213,16 +232,97 @@ impl SpatialJoinStream { } } - fn wait_build_index( + fn wait_create_spatial_join_components( &mut self, cx: &mut std::task::Context<'_>, ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> { - let index = ready!(self.once_fut_spatial_index.get_shared(cx))?; - self.spatial_index = Some(index); - self.state = SpatialJoinStreamState::FetchProbeBatch; + if self.index_provider.is_none() { + let spatial_join_components = + ready!(self.once_fut_spatial_join_components.get_shared(cx))?; + let provider = Arc::clone(&spatial_join_components.partitioned_index_provider); + self.num_regular_partitions = Some(provider.num_regular_partitions() as u32); + self.index_provider = Some(provider); + } + + let num_partitions = self + .num_regular_partitions + .expect("num_regular_partitions should be available"); + if num_partitions == 0 { + // Usually does not happen. The indexed side should have at least 1 partition. + self.state = SpatialJoinStreamState::Completed; + return Poll::Ready(Ok(StatefulStreamResult::Continue)); + } + + if num_partitions > 1 { + return Poll::Ready(sedona_internal_err!( + "Multi-partitioned spatial join is not supported yet" + )); + } + + self.state = SpatialJoinStreamState::WaitBuildIndex; Poll::Ready(Ok(StatefulStreamResult::Continue)) } + fn wait_build_index( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> { + let num_partitions = self + .num_regular_partitions + .expect("num_regular_partitions should be available"); + let partition_id = 0; + if partition_id >= num_partitions { + self.state = SpatialJoinStreamState::Completed; + return Poll::Ready(Ok(StatefulStreamResult::Continue)); + } + + if self.pending_index_future.is_none() { + let provider = Arc::clone( + self.index_provider + .as_ref() + .expect("Partitioned index provider should be available"), Review Comment: Should this return a `sedona_internal_err!()`? ########## rust/sedona-spatial-join/src/stream.rs: ########## @@ -213,16 +232,97 @@ impl SpatialJoinStream { } } - fn wait_build_index( + fn wait_create_spatial_join_components( &mut self, cx: &mut std::task::Context<'_>, ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> { - let index = ready!(self.once_fut_spatial_index.get_shared(cx))?; - self.spatial_index = Some(index); - self.state = SpatialJoinStreamState::FetchProbeBatch; + if self.index_provider.is_none() { + let spatial_join_components = + ready!(self.once_fut_spatial_join_components.get_shared(cx))?; + let provider = Arc::clone(&spatial_join_components.partitioned_index_provider); + self.num_regular_partitions = Some(provider.num_regular_partitions() as u32); + self.index_provider = Some(provider); + } + + let num_partitions = self + .num_regular_partitions + .expect("num_regular_partitions should be available"); + if num_partitions == 0 { + // Usually does not happen. The indexed side should have at least 1 partition. + self.state = SpatialJoinStreamState::Completed; + return Poll::Ready(Ok(StatefulStreamResult::Continue)); + } + + if num_partitions > 1 { + return Poll::Ready(sedona_internal_err!( + "Multi-partitioned spatial join is not supported yet" + )); + } + + self.state = SpatialJoinStreamState::WaitBuildIndex; Poll::Ready(Ok(StatefulStreamResult::Continue)) } + fn wait_build_index( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> { + let num_partitions = self + .num_regular_partitions + .expect("num_regular_partitions should be available"); + let partition_id = 0; + if partition_id >= num_partitions { + self.state = SpatialJoinStreamState::Completed; + return Poll::Ready(Ok(StatefulStreamResult::Continue)); + } + + if self.pending_index_future.is_none() { + let provider = Arc::clone( + self.index_provider + .as_ref() + .expect("Partitioned index provider should be available"), + ); + let future = { + log::debug!( + "[Partition {}] Building index for spatial partition {}", + self.probe_partition_id, + partition_id + ); + async move { provider.build_or_wait_for_index(partition_id).await }.boxed() + }; + self.pending_index_future = Some(future); + } + + let future = self + .pending_index_future + .as_mut() + .expect("pending future must exist"); Review Comment: Should this return a `sedona_internal_err!()`? ########## rust/sedona-spatial-join/src/prepare.rs: ########## @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{mem, sync::Arc}; + +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::{ + disk_manager::RefCountedTempFile, memory_pool::MemoryConsumer, SendableRecordBatchStream, + TaskContext, +}; +use datafusion_expr::JoinType; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use fastrand::Rng; +use sedona_common::{sedona_internal_err, NumSpatialPartitionsConfig, SedonaOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_geometry::bounding_box::BoundingBox; + +use crate::{ + index::{ + memory_plan::{compute_memory_plan, PartitionMemorySummary}, + partitioned_index_provider::PartitionedIndexProvider, + BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialJoinBuildMetrics, + }, + partitioning::{ + kdb::KDBPartitioner, + stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner}, + PartitionedSide, SpatialPartition, SpatialPartitioner, + }, + spatial_predicate::SpatialPredicate, + utils::bbox_sampler::BoundingBoxSamples, +}; + +pub(crate) struct SpatialJoinComponents { + pub partitioned_index_provider: Arc<PartitionedIndexProvider>, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn prepare_spatial_join_components( + context: Arc<TaskContext>, + build_schema: SchemaRef, + build_streams: Vec<SendableRecordBatchStream>, + spatial_predicate: SpatialPredicate, + join_type: JoinType, + probe_threads_count: usize, + metrics: ExecutionPlanMetricsSet, + seed: u64, +) -> Result<SpatialJoinComponents> { + let session_config = context.session_config(); + let target_batch_size = session_config.batch_size(); + let sedona_options = session_config + .options() + .extensions + .get::<SedonaOptions>() + .cloned() + .unwrap_or_default(); + let concurrent = sedona_options.spatial_join.concurrent_build_side_collection; + let spilled_batch_in_memory_size_threshold = if sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold + == 0 + { + None + } else { + Some( + sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold, + ) + }; + let spill_compression = session_config.spill_compression(); + let memory_pool = context.memory_pool(); + let num_partitions = build_streams.len(); + if num_partitions == 0 { + log::debug!("Build side has no data. Creating empty spatial index."); + let partitioned_index_provider = PartitionedIndexProvider::new_empty( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + return Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }); + } + + let runtime_env = context.runtime_env(); + let collector = BuildSideBatchesCollector::new( + spatial_predicate.clone(), + sedona_options.spatial_join.clone(), + Arc::clone(&runtime_env), + spill_compression, + ); + let mut collect_metrics_vec = Vec::with_capacity(num_partitions); + let mut reservations = Vec::with_capacity(num_partitions); + for k in 0..num_partitions { + let consumer = + MemoryConsumer::new(format!("SpatialJoinCollectBuildSide[{k}]")).with_can_spill(true); + let reservation = consumer.register(memory_pool); + reservations.push(reservation); + collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics)); + } + + let mut rng = Rng::with_seed(seed); + let mut build_partitions = collector + .collect_all( + build_streams, + reservations, + collect_metrics_vec.clone(), + concurrent, + rng.u64(0..0xFFFF), + ) + .await?; + let memory_plan = + compute_memory_plan(build_partitions.iter().map(PartitionMemorySummary::from))?; + let mut memory_plan_str = String::new(); + if memory_plan.debug_print(&mut memory_plan_str).is_ok() { + log::debug!( + "Computed memory plan for spatial join:\n{}", + memory_plan_str + ); + } + let num_partitions = match sedona_options.spatial_join.debug.num_spatial_partitions { + NumSpatialPartitionsConfig::Auto => memory_plan.num_partitions, + NumSpatialPartitionsConfig::Fixed(n) => { + log::debug!("Override number of spatial partitions to {}", n); + n + } + }; + let memory_for_intermittent_usage = match sedona_options + .spatial_join + .debug + .memory_for_intermittent_usage + { + Some(value) => { + log::debug!("Override memory for intermittent usage to {}", value); + value + } + None => memory_plan.memory_for_intermittent_usage, + }; + + if num_partitions == 1 { + log::debug!("Running single-partitioned in-memory spatial join"); + let partitioned_index_provider = PartitionedIndexProvider::new_single_partition( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + build_partitions, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }) + } else { + let build_partitioner: Arc<dyn SpatialPartitioner> = { + // Use spatial partitioners to partition the build side and the probe side, this will + // reduce the amount of work needed for probing each partitioned index. + // The KDB partitioner is built using the collected bounding box samples. + let mut bbox_samples = BoundingBoxSamples::empty(); + let mut geo_stats = GeoStatistics::empty(); + for partition in &mut build_partitions { + let samples = mem::take(&mut partition.bbox_samples); + bbox_samples = bbox_samples.combine(samples, &mut rng); + geo_stats.merge(&partition.geo_statistics); + } + + let extent = geo_stats + .bbox() + .cloned() + .unwrap_or(BoundingBox::xy((0, 0), (0, 0))); + let extent = if extent.is_empty() { + BoundingBox::xy((0, 0), (0, 0)) + } else { + extent + }; + let mut samples = bbox_samples.take_samples(); + let max_items_per_node = 1.max(samples.len() / num_partitions); + let max_levels = num_partitions; + + log::debug!( + "Number of samples: {}, max_items_per_node: {}, max_levels: {}", + samples.len(), + max_items_per_node, + max_levels + ); + rng.shuffle(&mut samples); + let kdb_partitioner = + KDBPartitioner::build(samples.into_iter(), max_items_per_node, max_levels, extent)?; + log::debug!( + "Built KDB spatial partitioner with {} partitions", + num_partitions + ); + let mut kdb_dbg_str = String::new(); + if kdb_partitioner.debug_print(&mut kdb_dbg_str).is_ok() { + log::debug!("KDB partitioner debug info:\n{}", kdb_dbg_str); + } + + Arc::new(kdb_partitioner) + }; + + let num_partitions = build_partitioner.num_regular_partitions(); + log::debug!("Actual number of spatial partitions: {}", num_partitions); + + // Spawn each task for each build partition to repartition the data using the spatial partitioner for + // the build/indexed side + let mut join_set = JoinSet::new(); + let buffer_bytes_threshold = memory_for_intermittent_usage / build_partitions.len(); + let mut reservations = Vec::with_capacity(build_partitions.len()); + for (k, partition) in build_partitions.into_iter().enumerate() { + let stream = partition.build_side_batch_stream; + let reservation = partition.reservation; + let metrics = &collect_metrics_vec[k]; + let spill_metrics = metrics.spill_metrics(); + let runtime_env = Arc::clone(&runtime_env); + let partitioner = Arc::clone(&build_partitioner); + join_set.spawn(async move { + let partitioned_spill_files = StreamRepartitioner::builder( + runtime_env, + partitioner, + PartitionedSide::BuildSide, + spill_metrics, + ) + .spill_compression(spill_compression) + .buffer_bytes_threshold(buffer_bytes_threshold) + .target_batch_size(target_batch_size) + .spilled_batch_in_memory_size_threshold(spilled_batch_in_memory_size_threshold) + .build() + .repartition_stream(stream) + .await; + partitioned_spill_files + }); + reservations.push(reservation); + } + + let results = join_set.join_all().await; + let mut partitioned_spill_files_vec = Vec::with_capacity(results.len()); + for result in results { + partitioned_spill_files_vec.push(result?); + } Review Comment: Not sure if it's possible/cleaner, but maybe: ```suggestion let mut partitioned_spill_files_vec = results.into_iter().collect::<Result<Vec<_>>>(); ``` ########## rust/sedona-spatial-join/src/utils/disposable_async_cell.rs: ########## @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt; + +use parking_lot::Mutex; +use tokio::sync::Notify; + +/// Error returned when writing to a [`DisposableAsyncCell`] fails. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum CellSetError { + /// The cell has already been disposed, so new values are rejected. + Disposed, + + /// The cell already has a value. + AlreadySet, +} + +/// An asynchronous cell that can be set at most once before either being +/// disposed or read by any number of waiters. +/// +/// Awaiters calling [`DisposableAsyncCell::get`] will park until a value is set +/// or the cell is disposed. Once disposed, `get` returns `None` and `set` +/// returns [`CellSetError::Disposed`]. Review Comment: I trust that you need this, but perhaps add to this comment what this is used for? ########## rust/sedona-spatial-join/src/index/memory_plan.rs: ########## @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::max; + +use datafusion_common::{DataFusionError, Result}; + +use super::BuildPartition; + +/// The memory accounting summary of a build side partition. This is collected +/// during the build side collection phase and used to estimate the memory usage for +/// running spatial join. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct PartitionMemorySummary { + /// Number of rows in the partition. + pub num_rows: usize, + /// The total memory reserved when collecting this build side partition. + pub reserved_memory: usize, + /// The estimated memory usage for building the spatial index for all the data in + /// this build side partition. + pub estimated_index_memory_usage: usize, +} + +impl From<&BuildPartition> for PartitionMemorySummary { + fn from(partition: &BuildPartition) -> Self { + Self { + num_rows: partition.num_rows, + reserved_memory: partition.reservation.size(), + estimated_index_memory_usage: partition.estimated_spatial_index_memory_usage, + } + } +} + +/// A detailed plan for memory usage during spatial join execution. The spatial join +/// could be spatial-partitioned if the reserved memory is not sufficient to hold the +/// entire spatial index. +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct MemoryPlan { + /// The total number of rows in the build side. + pub num_rows: usize, + /// The total memory reserved for the build side. + pub reserved_memory: usize, + /// The estimated memory usage for building the spatial index for the entire build side. + /// It could be larger than [`Self::reserved_memory`], and in that case we need to + /// partition the build side using spatial partitioning. + pub estimated_index_memory_usage: usize, + /// The memory budget for holding the spatial index. If the spatial join is partitioned, + /// this is the memory budget for holding the spatial index of a single partition. + pub memory_for_spatial_index: usize, + /// The memory budget for intermittent usage, such as buffering data during repartitioning. + pub memory_for_intermittent_usage: usize, + /// The number of spatial partitions to split the build side into. + pub num_partitions: usize, +} + +impl MemoryPlan { + /// Write debug info for this memory plan + pub fn debug_print(&self, f: &mut impl std::fmt::Write) -> std::fmt::Result { + writeln!(f, "Memory Plan:")?; Review Comment: Is this better than `#[derive(Debug)]` + `format!("self:#?")` ########## rust/sedona-spatial-join/src/prepare.rs: ########## @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{mem, sync::Arc}; + +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::{ + disk_manager::RefCountedTempFile, memory_pool::MemoryConsumer, SendableRecordBatchStream, + TaskContext, +}; +use datafusion_expr::JoinType; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use fastrand::Rng; +use sedona_common::{sedona_internal_err, NumSpatialPartitionsConfig, SedonaOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_geometry::bounding_box::BoundingBox; + +use crate::{ + index::{ + memory_plan::{compute_memory_plan, PartitionMemorySummary}, + partitioned_index_provider::PartitionedIndexProvider, + BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialJoinBuildMetrics, + }, + partitioning::{ + kdb::KDBPartitioner, + stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner}, + PartitionedSide, SpatialPartition, SpatialPartitioner, + }, + spatial_predicate::SpatialPredicate, + utils::bbox_sampler::BoundingBoxSamples, +}; + +pub(crate) struct SpatialJoinComponents { + pub partitioned_index_provider: Arc<PartitionedIndexProvider>, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn prepare_spatial_join_components( + context: Arc<TaskContext>, + build_schema: SchemaRef, + build_streams: Vec<SendableRecordBatchStream>, + spatial_predicate: SpatialPredicate, + join_type: JoinType, + probe_threads_count: usize, + metrics: ExecutionPlanMetricsSet, + seed: u64, +) -> Result<SpatialJoinComponents> { + let session_config = context.session_config(); + let target_batch_size = session_config.batch_size(); + let sedona_options = session_config + .options() + .extensions + .get::<SedonaOptions>() + .cloned() + .unwrap_or_default(); + let concurrent = sedona_options.spatial_join.concurrent_build_side_collection; + let spilled_batch_in_memory_size_threshold = if sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold + == 0 + { + None + } else { + Some( + sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold, + ) + }; + let spill_compression = session_config.spill_compression(); + let memory_pool = context.memory_pool(); + let num_partitions = build_streams.len(); + if num_partitions == 0 { + log::debug!("Build side has no data. Creating empty spatial index."); + let partitioned_index_provider = PartitionedIndexProvider::new_empty( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + return Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }); + } + + let runtime_env = context.runtime_env(); + let collector = BuildSideBatchesCollector::new( + spatial_predicate.clone(), + sedona_options.spatial_join.clone(), + Arc::clone(&runtime_env), + spill_compression, + ); + let mut collect_metrics_vec = Vec::with_capacity(num_partitions); + let mut reservations = Vec::with_capacity(num_partitions); + for k in 0..num_partitions { + let consumer = + MemoryConsumer::new(format!("SpatialJoinCollectBuildSide[{k}]")).with_can_spill(true); + let reservation = consumer.register(memory_pool); + reservations.push(reservation); + collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics)); + } + + let mut rng = Rng::with_seed(seed); + let mut build_partitions = collector + .collect_all( + build_streams, + reservations, + collect_metrics_vec.clone(), + concurrent, + rng.u64(0..0xFFFF), + ) + .await?; + let memory_plan = + compute_memory_plan(build_partitions.iter().map(PartitionMemorySummary::from))?; + let mut memory_plan_str = String::new(); + if memory_plan.debug_print(&mut memory_plan_str).is_ok() { + log::debug!( + "Computed memory plan for spatial join:\n{}", + memory_plan_str + ); + } + let num_partitions = match sedona_options.spatial_join.debug.num_spatial_partitions { + NumSpatialPartitionsConfig::Auto => memory_plan.num_partitions, + NumSpatialPartitionsConfig::Fixed(n) => { + log::debug!("Override number of spatial partitions to {}", n); + n + } + }; + let memory_for_intermittent_usage = match sedona_options + .spatial_join + .debug + .memory_for_intermittent_usage + { + Some(value) => { + log::debug!("Override memory for intermittent usage to {}", value); + value + } + None => memory_plan.memory_for_intermittent_usage, + }; + + if num_partitions == 1 { + log::debug!("Running single-partitioned in-memory spatial join"); + let partitioned_index_provider = PartitionedIndexProvider::new_single_partition( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + build_partitions, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }) + } else { + let build_partitioner: Arc<dyn SpatialPartitioner> = { + // Use spatial partitioners to partition the build side and the probe side, this will + // reduce the amount of work needed for probing each partitioned index. + // The KDB partitioner is built using the collected bounding box samples. + let mut bbox_samples = BoundingBoxSamples::empty(); + let mut geo_stats = GeoStatistics::empty(); + for partition in &mut build_partitions { + let samples = mem::take(&mut partition.bbox_samples); + bbox_samples = bbox_samples.combine(samples, &mut rng); + geo_stats.merge(&partition.geo_statistics); + } + + let extent = geo_stats + .bbox() + .cloned() + .unwrap_or(BoundingBox::xy((0, 0), (0, 0))); + let extent = if extent.is_empty() { + BoundingBox::xy((0, 0), (0, 0)) + } else { + extent + }; + let mut samples = bbox_samples.take_samples(); + let max_items_per_node = 1.max(samples.len() / num_partitions); + let max_levels = num_partitions; + + log::debug!( + "Number of samples: {}, max_items_per_node: {}, max_levels: {}", + samples.len(), + max_items_per_node, + max_levels + ); + rng.shuffle(&mut samples); + let kdb_partitioner = + KDBPartitioner::build(samples.into_iter(), max_items_per_node, max_levels, extent)?; + log::debug!( + "Built KDB spatial partitioner with {} partitions", + num_partitions + ); + let mut kdb_dbg_str = String::new(); + if kdb_partitioner.debug_print(&mut kdb_dbg_str).is_ok() { + log::debug!("KDB partitioner debug info:\n{}", kdb_dbg_str); + } Review Comment: Should this/is this going to compute a debug string even in release mode? ########## rust/sedona-spatial-join/src/prepare.rs: ########## @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{mem, sync::Arc}; + +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::{ + disk_manager::RefCountedTempFile, memory_pool::MemoryConsumer, SendableRecordBatchStream, + TaskContext, +}; +use datafusion_expr::JoinType; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use fastrand::Rng; +use sedona_common::{sedona_internal_err, NumSpatialPartitionsConfig, SedonaOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_geometry::bounding_box::BoundingBox; + +use crate::{ + index::{ + memory_plan::{compute_memory_plan, PartitionMemorySummary}, + partitioned_index_provider::PartitionedIndexProvider, + BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialJoinBuildMetrics, + }, + partitioning::{ + kdb::KDBPartitioner, + stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner}, + PartitionedSide, SpatialPartition, SpatialPartitioner, + }, + spatial_predicate::SpatialPredicate, + utils::bbox_sampler::BoundingBoxSamples, +}; + +pub(crate) struct SpatialJoinComponents { + pub partitioned_index_provider: Arc<PartitionedIndexProvider>, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn prepare_spatial_join_components( + context: Arc<TaskContext>, + build_schema: SchemaRef, + build_streams: Vec<SendableRecordBatchStream>, + spatial_predicate: SpatialPredicate, + join_type: JoinType, + probe_threads_count: usize, + metrics: ExecutionPlanMetricsSet, + seed: u64, +) -> Result<SpatialJoinComponents> { + let session_config = context.session_config(); + let target_batch_size = session_config.batch_size(); + let sedona_options = session_config + .options() + .extensions + .get::<SedonaOptions>() + .cloned() + .unwrap_or_default(); + let concurrent = sedona_options.spatial_join.concurrent_build_side_collection; + let spilled_batch_in_memory_size_threshold = if sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold + == 0 + { + None + } else { + Some( + sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold, + ) + }; + let spill_compression = session_config.spill_compression(); + let memory_pool = context.memory_pool(); + let num_partitions = build_streams.len(); + if num_partitions == 0 { + log::debug!("Build side has no data. Creating empty spatial index."); + let partitioned_index_provider = PartitionedIndexProvider::new_empty( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + return Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }); + } + + let runtime_env = context.runtime_env(); + let collector = BuildSideBatchesCollector::new( + spatial_predicate.clone(), + sedona_options.spatial_join.clone(), + Arc::clone(&runtime_env), + spill_compression, + ); + let mut collect_metrics_vec = Vec::with_capacity(num_partitions); + let mut reservations = Vec::with_capacity(num_partitions); + for k in 0..num_partitions { + let consumer = + MemoryConsumer::new(format!("SpatialJoinCollectBuildSide[{k}]")).with_can_spill(true); + let reservation = consumer.register(memory_pool); + reservations.push(reservation); + collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics)); + } + + let mut rng = Rng::with_seed(seed); + let mut build_partitions = collector + .collect_all( + build_streams, + reservations, + collect_metrics_vec.clone(), + concurrent, + rng.u64(0..0xFFFF), + ) + .await?; + let memory_plan = + compute_memory_plan(build_partitions.iter().map(PartitionMemorySummary::from))?; + let mut memory_plan_str = String::new(); + if memory_plan.debug_print(&mut memory_plan_str).is_ok() { + log::debug!( + "Computed memory plan for spatial join:\n{}", + memory_plan_str + ); + } + let num_partitions = match sedona_options.spatial_join.debug.num_spatial_partitions { + NumSpatialPartitionsConfig::Auto => memory_plan.num_partitions, + NumSpatialPartitionsConfig::Fixed(n) => { + log::debug!("Override number of spatial partitions to {}", n); + n + } + }; + let memory_for_intermittent_usage = match sedona_options + .spatial_join + .debug + .memory_for_intermittent_usage + { + Some(value) => { + log::debug!("Override memory for intermittent usage to {}", value); + value + } + None => memory_plan.memory_for_intermittent_usage, + }; + + if num_partitions == 1 { + log::debug!("Running single-partitioned in-memory spatial join"); + let partitioned_index_provider = PartitionedIndexProvider::new_single_partition( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + build_partitions, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }) + } else { + let build_partitioner: Arc<dyn SpatialPartitioner> = { + // Use spatial partitioners to partition the build side and the probe side, this will + // reduce the amount of work needed for probing each partitioned index. + // The KDB partitioner is built using the collected bounding box samples. + let mut bbox_samples = BoundingBoxSamples::empty(); + let mut geo_stats = GeoStatistics::empty(); + for partition in &mut build_partitions { + let samples = mem::take(&mut partition.bbox_samples); + bbox_samples = bbox_samples.combine(samples, &mut rng); + geo_stats.merge(&partition.geo_statistics); + } + + let extent = geo_stats + .bbox() + .cloned() + .unwrap_or(BoundingBox::xy((0, 0), (0, 0))); + let extent = if extent.is_empty() { + BoundingBox::xy((0, 0), (0, 0)) + } else { + extent + }; + let mut samples = bbox_samples.take_samples(); + let max_items_per_node = 1.max(samples.len() / num_partitions); + let max_levels = num_partitions; + + log::debug!( + "Number of samples: {}, max_items_per_node: {}, max_levels: {}", + samples.len(), + max_items_per_node, + max_levels + ); + rng.shuffle(&mut samples); + let kdb_partitioner = + KDBPartitioner::build(samples.into_iter(), max_items_per_node, max_levels, extent)?; + log::debug!( + "Built KDB spatial partitioner with {} partitions", + num_partitions + ); + let mut kdb_dbg_str = String::new(); + if kdb_partitioner.debug_print(&mut kdb_dbg_str).is_ok() { + log::debug!("KDB partitioner debug info:\n{}", kdb_dbg_str); + } + + Arc::new(kdb_partitioner) + }; + + let num_partitions = build_partitioner.num_regular_partitions(); + log::debug!("Actual number of spatial partitions: {}", num_partitions); + + // Spawn each task for each build partition to repartition the data using the spatial partitioner for + // the build/indexed side + let mut join_set = JoinSet::new(); + let buffer_bytes_threshold = memory_for_intermittent_usage / build_partitions.len(); + let mut reservations = Vec::with_capacity(build_partitions.len()); + for (k, partition) in build_partitions.into_iter().enumerate() { Review Comment: Does this JoinSet limit the concurrency in any way? (i.e., if there are 2000 partitions will we get/do we want 2000 threads to spawn?) ########## rust/sedona-spatial-join/src/index/partitioned_index_provider.rs: ########## @@ -0,0 +1,598 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::SchemaRef; +use datafusion_common::{DataFusionError, Result, SharedResult}; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::memory_pool::MemoryReservation; +use datafusion_expr::JoinType; +use futures::StreamExt; +use parking_lot::Mutex; +use sedona_common::{sedona_internal_err, SpatialJoinOptions}; +use std::ops::DerefMut; +use std::sync::Arc; +use tokio::sync::mpsc; + +use crate::evaluated_batch::evaluated_batch_stream::external::ExternalEvaluatedBatchStream; +use crate::index::BuildPartition; +use crate::partitioning::stream_repartitioner::{SpilledPartition, SpilledPartitions}; +use crate::utils::disposable_async_cell::DisposableAsyncCell; +use crate::{ + index::{SpatialIndex, SpatialIndexBuilder, SpatialJoinBuildMetrics}, + partitioning::SpatialPartition, + spatial_predicate::SpatialPredicate, +}; + +pub(crate) struct PartitionedIndexProvider { + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + metrics: SpatialJoinBuildMetrics, + + /// Data on the build side to build index for + data: BuildSideData, + + /// Async cells for indexes, one per regular partition + index_cells: Vec<DisposableAsyncCell<SharedResult<Arc<SpatialIndex>>>>, + + /// The memory reserved in the build side collection phase. We'll hold them until + /// we don't need to build spatial indexes. + _reservations: Vec<MemoryReservation>, +} + +pub(crate) enum BuildSideData { + SinglePartition(Mutex<Option<Vec<BuildPartition>>>), + MultiPartition(Mutex<SpilledPartitions>), +} + +impl PartitionedIndexProvider { + #[allow(clippy::too_many_arguments)] + pub fn new_multi_partition( + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + partitioned_spill_files: SpilledPartitions, + metrics: SpatialJoinBuildMetrics, + reservations: Vec<MemoryReservation>, + ) -> Self { + let num_partitions = partitioned_spill_files.num_regular_partitions(); + let index_cells = (0..num_partitions) + .map(|_| DisposableAsyncCell::new()) + .collect(); + Self { + schema, + spatial_predicate, + options, + join_type, + probe_threads_count, + metrics, + data: BuildSideData::MultiPartition(Mutex::new(partitioned_spill_files)), + index_cells, + _reservations: reservations, + } + } + + #[allow(clippy::too_many_arguments)] + pub fn new_single_partition( + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + mut build_partitions: Vec<BuildPartition>, + metrics: SpatialJoinBuildMetrics, + ) -> Self { + let reservations = build_partitions + .iter_mut() + .map(|p| p.reservation.take()) + .collect(); + let index_cells = vec![DisposableAsyncCell::new()]; + Self { + schema, + spatial_predicate, + options, + join_type, + probe_threads_count, + metrics, + data: BuildSideData::SinglePartition(Mutex::new(Some(build_partitions))), + index_cells, + _reservations: reservations, + } + } + + pub fn new_empty( + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + metrics: SpatialJoinBuildMetrics, + ) -> Self { + let build_partitions = Vec::new(); + Self::new_single_partition( + schema, + spatial_predicate, + options, + join_type, + probe_threads_count, + build_partitions, + metrics, + ) + } + + pub fn num_regular_partitions(&self) -> usize { + self.index_cells.len() + } + + pub async fn build_or_wait_for_index( + &self, + partition_id: u32, + ) -> Option<Result<Arc<SpatialIndex>>> { + let cell = match self.index_cells.get(partition_id as usize) { + Some(cell) => cell, + None => { + return Some(sedona_internal_err!( + "partition_id {} exceeds {} partitions", + partition_id, + self.index_cells.len() + )) + } + }; + if !cell.is_empty() { + return get_index_from_cell(cell).await; + } + + let res_index = { + let opt_res_index = self.maybe_build_index(partition_id).await; + match opt_res_index { + Some(res_index) => res_index, + None => { + // The build side data for building the index has already been consumed by someone else, + // we just need to wait for the task consumed the data to finish building the index. + return get_index_from_cell(cell).await; + } + } + }; + + match res_index { + Ok(idx) => { + if let Err(e) = cell.set(Ok(Arc::clone(&idx))) { + // This is probably because the cell has been disposed. No one + // will get the index from the cell so this failure is not a big deal. + log::debug!("Cannot set the index into the async cell: {:?}", e); + } + Some(Ok(idx)) + } + Err(err) => { + let err_arc = Arc::new(err); + if let Err(e) = cell.set(Err(Arc::clone(&err_arc))) { + log::debug!( + "Cannot set the index build error into the async cell: {:?}", + e + ); + } + Some(Err(DataFusionError::Shared(err_arc))) + } + } + } + + async fn maybe_build_index(&self, partition_id: u32) -> Option<Result<Arc<SpatialIndex>>> { + match &self.data { + BuildSideData::SinglePartition(build_partition_opt) => { + if partition_id != 0 { + return Some(sedona_internal_err!( + "partition_id for single-partition index is not 0" + )); + } + + // consume the build side data for building the index + let build_partition_opt = { + let mut locked = build_partition_opt.lock(); + std::mem::take(locked.deref_mut()) + }; + + let Some(build_partition) = build_partition_opt else { + // already consumed by previous attempts, the result should be present in the channel. + return None; + }; + Some(self.build_index_for_single_partition(build_partition).await) + } + BuildSideData::MultiPartition(partitioned_spill_files) => { + // consume this partition of build side data for building index + let spilled_partition = { + let mut locked = partitioned_spill_files.lock(); + let partition = SpatialPartition::Regular(partition_id); + if !locked.can_take_spilled_partition(partition) { + // already consumed by previous attempts, the result should be present in the channel. + return None; + } + match locked.take_spilled_partition(partition) { + Ok(spilled_partition) => spilled_partition, + Err(e) => return Some(Err(e)), + } + }; + Some( + self.build_index_for_spilled_partition(spilled_partition) + .await, + ) + } + } + } + + #[allow(unused)] + pub async fn wait_for_index(&self, partition_id: u32) -> Option<Result<Arc<SpatialIndex>>> { Review Comment: Should this be `#[cfg(test)]` or can it be removed? ########## rust/sedona-spatial-join/src/index/partitioned_index_provider.rs: ########## @@ -0,0 +1,598 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::SchemaRef; +use datafusion_common::{DataFusionError, Result, SharedResult}; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::memory_pool::MemoryReservation; +use datafusion_expr::JoinType; +use futures::StreamExt; +use parking_lot::Mutex; +use sedona_common::{sedona_internal_err, SpatialJoinOptions}; +use std::ops::DerefMut; +use std::sync::Arc; +use tokio::sync::mpsc; + +use crate::evaluated_batch::evaluated_batch_stream::external::ExternalEvaluatedBatchStream; +use crate::index::BuildPartition; +use crate::partitioning::stream_repartitioner::{SpilledPartition, SpilledPartitions}; +use crate::utils::disposable_async_cell::DisposableAsyncCell; +use crate::{ + index::{SpatialIndex, SpatialIndexBuilder, SpatialJoinBuildMetrics}, + partitioning::SpatialPartition, + spatial_predicate::SpatialPredicate, +}; + +pub(crate) struct PartitionedIndexProvider { + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + metrics: SpatialJoinBuildMetrics, + + /// Data on the build side to build index for + data: BuildSideData, + + /// Async cells for indexes, one per regular partition + index_cells: Vec<DisposableAsyncCell<SharedResult<Arc<SpatialIndex>>>>, + + /// The memory reserved in the build side collection phase. We'll hold them until + /// we don't need to build spatial indexes. + _reservations: Vec<MemoryReservation>, +} + +pub(crate) enum BuildSideData { + SinglePartition(Mutex<Option<Vec<BuildPartition>>>), + MultiPartition(Mutex<SpilledPartitions>), +} + +impl PartitionedIndexProvider { + #[allow(clippy::too_many_arguments)] + pub fn new_multi_partition( + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + partitioned_spill_files: SpilledPartitions, + metrics: SpatialJoinBuildMetrics, + reservations: Vec<MemoryReservation>, + ) -> Self { + let num_partitions = partitioned_spill_files.num_regular_partitions(); + let index_cells = (0..num_partitions) + .map(|_| DisposableAsyncCell::new()) + .collect(); + Self { + schema, + spatial_predicate, + options, + join_type, + probe_threads_count, + metrics, + data: BuildSideData::MultiPartition(Mutex::new(partitioned_spill_files)), + index_cells, + _reservations: reservations, + } + } + + #[allow(clippy::too_many_arguments)] + pub fn new_single_partition( + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + mut build_partitions: Vec<BuildPartition>, + metrics: SpatialJoinBuildMetrics, + ) -> Self { + let reservations = build_partitions + .iter_mut() + .map(|p| p.reservation.take()) + .collect(); + let index_cells = vec![DisposableAsyncCell::new()]; + Self { + schema, + spatial_predicate, + options, + join_type, + probe_threads_count, + metrics, + data: BuildSideData::SinglePartition(Mutex::new(Some(build_partitions))), + index_cells, + _reservations: reservations, + } + } + + pub fn new_empty( + schema: SchemaRef, + spatial_predicate: SpatialPredicate, + options: SpatialJoinOptions, + join_type: JoinType, + probe_threads_count: usize, + metrics: SpatialJoinBuildMetrics, + ) -> Self { + let build_partitions = Vec::new(); + Self::new_single_partition( + schema, + spatial_predicate, + options, + join_type, + probe_threads_count, + build_partitions, + metrics, + ) + } + + pub fn num_regular_partitions(&self) -> usize { + self.index_cells.len() + } + + pub async fn build_or_wait_for_index( + &self, + partition_id: u32, + ) -> Option<Result<Arc<SpatialIndex>>> { + let cell = match self.index_cells.get(partition_id as usize) { + Some(cell) => cell, + None => { + return Some(sedona_internal_err!( + "partition_id {} exceeds {} partitions", + partition_id, + self.index_cells.len() + )) + } + }; + if !cell.is_empty() { + return get_index_from_cell(cell).await; + } + + let res_index = { + let opt_res_index = self.maybe_build_index(partition_id).await; + match opt_res_index { + Some(res_index) => res_index, + None => { + // The build side data for building the index has already been consumed by someone else, + // we just need to wait for the task consumed the data to finish building the index. + return get_index_from_cell(cell).await; + } + } + }; + + match res_index { + Ok(idx) => { + if let Err(e) = cell.set(Ok(Arc::clone(&idx))) { + // This is probably because the cell has been disposed. No one + // will get the index from the cell so this failure is not a big deal. + log::debug!("Cannot set the index into the async cell: {:?}", e); + } + Some(Ok(idx)) + } + Err(err) => { + let err_arc = Arc::new(err); + if let Err(e) = cell.set(Err(Arc::clone(&err_arc))) { + log::debug!( + "Cannot set the index build error into the async cell: {:?}", + e + ); + } + Some(Err(DataFusionError::Shared(err_arc))) + } + } + } + + async fn maybe_build_index(&self, partition_id: u32) -> Option<Result<Arc<SpatialIndex>>> { + match &self.data { + BuildSideData::SinglePartition(build_partition_opt) => { + if partition_id != 0 { + return Some(sedona_internal_err!( + "partition_id for single-partition index is not 0" + )); + } + + // consume the build side data for building the index + let build_partition_opt = { + let mut locked = build_partition_opt.lock(); + std::mem::take(locked.deref_mut()) + }; + + let Some(build_partition) = build_partition_opt else { + // already consumed by previous attempts, the result should be present in the channel. + return None; + }; + Some(self.build_index_for_single_partition(build_partition).await) + } + BuildSideData::MultiPartition(partitioned_spill_files) => { + // consume this partition of build side data for building index + let spilled_partition = { + let mut locked = partitioned_spill_files.lock(); + let partition = SpatialPartition::Regular(partition_id); + if !locked.can_take_spilled_partition(partition) { + // already consumed by previous attempts, the result should be present in the channel. + return None; + } + match locked.take_spilled_partition(partition) { + Ok(spilled_partition) => spilled_partition, + Err(e) => return Some(Err(e)), + } + }; + Some( + self.build_index_for_spilled_partition(spilled_partition) + .await, + ) + } + } + } + + #[allow(unused)] + pub async fn wait_for_index(&self, partition_id: u32) -> Option<Result<Arc<SpatialIndex>>> { + let cell = match self.index_cells.get(partition_id as usize) { + Some(cell) => cell, + None => { + return Some(sedona_internal_err!( + "partition_id {} exceeds {} partitions", + partition_id, + self.index_cells.len() + )) + } + }; + + get_index_from_cell(cell).await + } + + pub fn dispose_index(&self, partition_id: u32) { + if let Some(cell) = self.index_cells.get(partition_id as usize) { + cell.dispose(); + } + } + + pub fn num_loaded_indexes(&self) -> usize { + self.index_cells + .iter() + .filter(|index_cell| index_cell.is_set()) + .count() + } + + async fn build_index_for_single_partition( + &self, + build_partitions: Vec<BuildPartition>, + ) -> Result<Arc<SpatialIndex>> { + let mut index_builder = SpatialIndexBuilder::new( + Arc::clone(&self.schema), + self.spatial_predicate.clone(), + self.options.clone(), + self.join_type, + self.probe_threads_count, + self.metrics.clone(), + )?; + + for build_partition in build_partitions { + let stream = build_partition.build_side_batch_stream; + let geo_statistics = build_partition.geo_statistics; + index_builder.add_stream(stream, geo_statistics).await?; + } + + let index = index_builder.finish()?; + Ok(Arc::new(index)) + } + + async fn build_index_for_spilled_partition( + &self, + spilled_partition: SpilledPartition, + ) -> Result<Arc<SpatialIndex>> { + let mut index_builder = SpatialIndexBuilder::new( + Arc::clone(&self.schema), + self.spatial_predicate.clone(), + self.options.clone(), + self.join_type, + self.probe_threads_count, + self.metrics.clone(), + )?; + + // Spawn tasks to load indexed batches from spilled files concurrently + let (spill_files, geo_statistics, _) = spilled_partition.into_inner(); + let mut join_set: JoinSet<Result<(), DataFusionError>> = JoinSet::new(); + let (tx, mut rx) = mpsc::channel(spill_files.len() * 2 + 1); + for spill_file in spill_files { Review Comment: Is there any value to limiting the concurrency? (i.e., will there are 2000 spill files and if so will we get/do we want to spawn 2000 threads?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
