Kontinuation commented on code in PR #555: URL: https://github.com/apache/sedona-db/pull/555#discussion_r2736113234
########## rust/sedona-spatial-join/src/prepare.rs: ########## @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{mem, sync::Arc}; + +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_common_runtime::JoinSet; +use datafusion_execution::{ + disk_manager::RefCountedTempFile, memory_pool::MemoryConsumer, SendableRecordBatchStream, + TaskContext, +}; +use datafusion_expr::JoinType; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use fastrand::Rng; +use sedona_common::{sedona_internal_err, NumSpatialPartitionsConfig, SedonaOptions}; +use sedona_expr::statistics::GeoStatistics; +use sedona_geometry::bounding_box::BoundingBox; + +use crate::{ + index::{ + memory_plan::{compute_memory_plan, PartitionMemorySummary}, + partitioned_index_provider::PartitionedIndexProvider, + BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialJoinBuildMetrics, + }, + partitioning::{ + kdb::KDBPartitioner, + stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner}, + PartitionedSide, SpatialPartition, SpatialPartitioner, + }, + spatial_predicate::SpatialPredicate, + utils::bbox_sampler::BoundingBoxSamples, +}; + +pub(crate) struct SpatialJoinComponents { + pub partitioned_index_provider: Arc<PartitionedIndexProvider>, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn prepare_spatial_join_components( + context: Arc<TaskContext>, + build_schema: SchemaRef, + build_streams: Vec<SendableRecordBatchStream>, + spatial_predicate: SpatialPredicate, + join_type: JoinType, + probe_threads_count: usize, + metrics: ExecutionPlanMetricsSet, + seed: u64, +) -> Result<SpatialJoinComponents> { + let session_config = context.session_config(); + let target_batch_size = session_config.batch_size(); + let sedona_options = session_config + .options() + .extensions + .get::<SedonaOptions>() + .cloned() + .unwrap_or_default(); + let concurrent = sedona_options.spatial_join.concurrent_build_side_collection; + let spilled_batch_in_memory_size_threshold = if sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold + == 0 + { + None + } else { + Some( + sedona_options + .spatial_join + .spilled_batch_in_memory_size_threshold, + ) + }; + let spill_compression = session_config.spill_compression(); + let memory_pool = context.memory_pool(); + let num_partitions = build_streams.len(); + if num_partitions == 0 { + log::debug!("Build side has no data. Creating empty spatial index."); + let partitioned_index_provider = PartitionedIndexProvider::new_empty( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + return Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }); + } + + let runtime_env = context.runtime_env(); + let collector = BuildSideBatchesCollector::new( + spatial_predicate.clone(), + sedona_options.spatial_join.clone(), + Arc::clone(&runtime_env), + spill_compression, + ); + let mut collect_metrics_vec = Vec::with_capacity(num_partitions); + let mut reservations = Vec::with_capacity(num_partitions); + for k in 0..num_partitions { + let consumer = + MemoryConsumer::new(format!("SpatialJoinCollectBuildSide[{k}]")).with_can_spill(true); + let reservation = consumer.register(memory_pool); + reservations.push(reservation); + collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics)); + } + + let mut rng = Rng::with_seed(seed); + let mut build_partitions = collector + .collect_all( + build_streams, + reservations, + collect_metrics_vec.clone(), + concurrent, + rng.u64(0..0xFFFF), + ) + .await?; + let memory_plan = + compute_memory_plan(build_partitions.iter().map(PartitionMemorySummary::from))?; + let mut memory_plan_str = String::new(); + if memory_plan.debug_print(&mut memory_plan_str).is_ok() { + log::debug!( + "Computed memory plan for spatial join:\n{}", + memory_plan_str + ); + } + let num_partitions = match sedona_options.spatial_join.debug.num_spatial_partitions { + NumSpatialPartitionsConfig::Auto => memory_plan.num_partitions, + NumSpatialPartitionsConfig::Fixed(n) => { + log::debug!("Override number of spatial partitions to {}", n); + n + } + }; + let memory_for_intermittent_usage = match sedona_options + .spatial_join + .debug + .memory_for_intermittent_usage + { + Some(value) => { + log::debug!("Override memory for intermittent usage to {}", value); + value + } + None => memory_plan.memory_for_intermittent_usage, + }; + + if num_partitions == 1 { + log::debug!("Running single-partitioned in-memory spatial join"); + let partitioned_index_provider = PartitionedIndexProvider::new_single_partition( + build_schema, + spatial_predicate, + sedona_options.spatial_join, + join_type, + probe_threads_count, + build_partitions, + SpatialJoinBuildMetrics::new(0, &metrics), + ); + Ok(SpatialJoinComponents { + partitioned_index_provider: Arc::new(partitioned_index_provider), + }) + } else { + let build_partitioner: Arc<dyn SpatialPartitioner> = { + // Use spatial partitioners to partition the build side and the probe side, this will + // reduce the amount of work needed for probing each partitioned index. + // The KDB partitioner is built using the collected bounding box samples. + let mut bbox_samples = BoundingBoxSamples::empty(); + let mut geo_stats = GeoStatistics::empty(); + for partition in &mut build_partitions { + let samples = mem::take(&mut partition.bbox_samples); + bbox_samples = bbox_samples.combine(samples, &mut rng); + geo_stats.merge(&partition.geo_statistics); + } + + let extent = geo_stats + .bbox() + .cloned() + .unwrap_or(BoundingBox::xy((0, 0), (0, 0))); + let extent = if extent.is_empty() { + BoundingBox::xy((0, 0), (0, 0)) + } else { + extent + }; + let mut samples = bbox_samples.take_samples(); + let max_items_per_node = 1.max(samples.len() / num_partitions); + let max_levels = num_partitions; + + log::debug!( + "Number of samples: {}, max_items_per_node: {}, max_levels: {}", + samples.len(), + max_items_per_node, + max_levels + ); + rng.shuffle(&mut samples); + let kdb_partitioner = + KDBPartitioner::build(samples.into_iter(), max_items_per_node, max_levels, extent)?; + log::debug!( + "Built KDB spatial partitioner with {} partitions", + num_partitions + ); + let mut kdb_dbg_str = String::new(); + if kdb_partitioner.debug_print(&mut kdb_dbg_str).is_ok() { + log::debug!("KDB partitioner debug info:\n{}", kdb_dbg_str); + } + + Arc::new(kdb_partitioner) + }; + + let num_partitions = build_partitioner.num_regular_partitions(); + log::debug!("Actual number of spatial partitions: {}", num_partitions); + + // Spawn each task for each build partition to repartition the data using the spatial partitioner for + // the build/indexed side + let mut join_set = JoinSet::new(); + let buffer_bytes_threshold = memory_for_intermittent_usage / build_partitions.len(); + let mut reservations = Vec::with_capacity(build_partitions.len()); + for (k, partition) in build_partitions.into_iter().enumerate() { + let stream = partition.build_side_batch_stream; + let reservation = partition.reservation; + let metrics = &collect_metrics_vec[k]; + let spill_metrics = metrics.spill_metrics(); + let runtime_env = Arc::clone(&runtime_env); + let partitioner = Arc::clone(&build_partitioner); + join_set.spawn(async move { + let partitioned_spill_files = StreamRepartitioner::builder( + runtime_env, + partitioner, + PartitionedSide::BuildSide, + spill_metrics, + ) + .spill_compression(spill_compression) + .buffer_bytes_threshold(buffer_bytes_threshold) + .target_batch_size(target_batch_size) + .spilled_batch_in_memory_size_threshold(spilled_batch_in_memory_size_threshold) + .build() + .repartition_stream(stream) + .await; + partitioned_spill_files + }); + reservations.push(reservation); + } + + let results = join_set.join_all().await; + let mut partitioned_spill_files_vec = Vec::with_capacity(results.len()); + for result in results { + partitioned_spill_files_vec.push(result?); + } Review Comment: Switched to use collect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
