Kontinuation commented on code in PR #563: URL: https://github.com/apache/sedona-db/pull/563#discussion_r2748718865
########## rust/sedona-spatial-join/src/probe/partitioned_stream_provider.rs: ########## @@ -0,0 +1,500 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::DerefMut; +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use datafusion::config::SpillCompression; +use datafusion_common::{DataFusionError, Result}; +use datafusion_execution::runtime_env::RuntimeEnv; +use parking_lot::Mutex; + +use crate::probe::first_pass_stream::FirstPassStream; +use crate::probe::non_partitioned_stream::NonPartitionedStream; +use crate::probe::ProbeStreamMetrics; +use crate::{ + evaluated_batch::evaluated_batch_stream::{ + external::ExternalEvaluatedBatchStream, SendableEvaluatedBatchStream, + }, + partitioning::{ + stream_repartitioner::{SpilledPartitions, StreamRepartitioner}, + PartitionedSide, SpatialPartition, SpatialPartitioner, + }, +}; + +#[derive(Clone)] +pub(crate) struct ProbeStreamOptions { + pub partitioner: Option<Arc<dyn SpatialPartitioner>>, + pub target_batch_rows: usize, + pub spill_compression: SpillCompression, + pub buffer_bytes_threshold: usize, + pub spilled_batch_in_memory_size_threshold: Option<usize>, +} + +pub(crate) struct PartitionedProbeStreamProvider { + state: Arc<Mutex<ProbeStreamState>>, + runtime_env: Arc<RuntimeEnv>, + options: ProbeStreamOptions, + schema: SchemaRef, + metrics: ProbeStreamMetrics, +} + +enum ProbeStreamState { + Pending { + source: SendableEvaluatedBatchStream, + }, + FirstPass, + SubsequentPass { + manifest: ProbePartitionManifest, + }, + NonPartitionedConsumed, + Failed(Arc<DataFusionError>), +} + +impl PartitionedProbeStreamProvider { + pub fn new( + runtime_env: Arc<RuntimeEnv>, + options: ProbeStreamOptions, + source: SendableEvaluatedBatchStream, + metrics: ProbeStreamMetrics, + ) -> Self { + let schema = source.schema(); + Self { + state: Arc::new(Mutex::new(ProbeStreamState::Pending { source })), + runtime_env, + options, + schema, + metrics, + } + } + + pub fn stream_for(&self, partition: SpatialPartition) -> Result<SendableEvaluatedBatchStream> { + match partition { + SpatialPartition::None => Err(DataFusionError::Execution( + "SpatialPartition::None should be handled via outer join logic".into(), + )), + SpatialPartition::Regular(0) => self.first_pass_stream(), + SpatialPartition::Regular(_) | SpatialPartition::Multi => { + if self.options.partitioner.is_none() { + Err(DataFusionError::Execution( + "Non-partitioned probe stream only supports Regular(0)".into(), + )) + } else { + self.subsequent_pass_stream(partition) + } + } + } + } + + fn first_pass_stream(&self) -> Result<SendableEvaluatedBatchStream> { + if self.options.partitioner.is_none() { + return self.non_partitioned_first_pass_stream(); + } + + let schema = Arc::clone(&self.schema); + let mut state_guard = self.state.lock(); + match std::mem::replace(&mut *state_guard, ProbeStreamState::FirstPass) { + ProbeStreamState::Pending { source } => { + let partitioner = Arc::clone( + self.options + .partitioner + .as_ref() + .expect("Partitioned first pass requires a partitioner"), + ); + let repartitioner = StreamRepartitioner::builder( + Arc::clone(&self.runtime_env), + Arc::clone(&partitioner), + PartitionedSide::ProbeSide, + self.metrics.spill_metrics.clone(), + ) + .spill_compression(self.options.spill_compression) + .buffer_bytes_threshold(self.options.buffer_bytes_threshold) + .target_batch_size(self.options.target_batch_rows) + .spilled_batch_in_memory_size_threshold( + self.options.spilled_batch_in_memory_size_threshold, + ) + .build(); + + let state = Arc::clone(&self.state); + let callback = move |res: Result<SpilledPartitions>| { + let mut guard = state.lock(); + *guard = match res { + Ok(mut spills) => { + let mut s = String::new(); + if spills.debug_print(&mut s).is_ok() { + log::debug!("Probe side spilled partitions:\n{}", s); + } + + // Sanity check: Regular(0) and None should be empty + let mut check_empty = |partition: SpatialPartition| -> Result<()> { + let spilled = spills.take_spilled_partition(partition)?; + if !spilled.into_spill_files().is_empty() { + return Err(DataFusionError::Execution(format!( + "{:?} partition should not have spilled data", + partition + ))); + } + Ok(()) + }; + + match check_empty(SpatialPartition::Regular(0)) + .and_then(|_| check_empty(SpatialPartition::None)) + { + Ok(_) => ProbeStreamState::SubsequentPass { + manifest: ProbePartitionManifest::new(schema, spills), + }, + Err(err) => ProbeStreamState::Failed(Arc::new(err)), + } + } + Err(err) => { + let err_arc = Arc::new(err); + ProbeStreamState::Failed(Arc::clone(&err_arc)) + } + }; + Ok(()) + }; + + let first_pass = FirstPassStream::new( + source, + repartitioner, + partitioner, + self.metrics.clone(), + callback, + ); + Ok(Box::pin(first_pass)) + } + ProbeStreamState::FirstPass => Err(DataFusionError::Execution( + "First pass already running for partitioned probe stream".into(), + )), + ProbeStreamState::SubsequentPass { .. } => Err(DataFusionError::Execution( + "First pass already completed".into(), + )), + ProbeStreamState::NonPartitionedConsumed => Err(DataFusionError::Execution( + "Non-partitioned probe stream already consumed".into(), + )), + ProbeStreamState::Failed(err) => Err(DataFusionError::Shared(err)), + } + } + + fn non_partitioned_first_pass_stream(&self) -> Result<SendableEvaluatedBatchStream> { + let mut state_guard = self.state.lock(); + match std::mem::replace(&mut *state_guard, ProbeStreamState::NonPartitionedConsumed) { + ProbeStreamState::Pending { source } => Ok(Box::pin(NonPartitionedStream::new( + source, + self.metrics.clone(), + ))), + ProbeStreamState::NonPartitionedConsumed => Err(DataFusionError::Execution( + "Non-partitioned probe stream already consumed".into(), + )), + ProbeStreamState::Failed(err) => Err(DataFusionError::Shared(err)), + _ => Err(DataFusionError::Execution( + "Non-partitioned probe stream is not available".into(), + )), + } + } + + fn subsequent_pass_stream( + &self, + partition: SpatialPartition, + ) -> Result<SendableEvaluatedBatchStream> { + if self.options.partitioner.is_none() { + return Err(DataFusionError::Execution( + "Non-partitioned probe stream cannot serve additional partitions".into(), + )); + } + let mut locked = self.state.lock(); + let manifest = match locked.deref_mut() { + ProbeStreamState::SubsequentPass { manifest } => manifest, + ProbeStreamState::Failed(err) => return Err(DataFusionError::Shared(Arc::clone(err))), + _ => { + return Err(DataFusionError::Execution( + "Partitioned probe stream warm-up not finished".into(), + )) + } + }; + + { + // let mut manifest = manifest.lock(); + manifest.stream_for(partition) + } + } + + pub fn get_partition_row_count(&self, partition: SpatialPartition) -> Result<usize> { + let mut locked = self.state.lock(); + let manifest = match locked.deref_mut() { + ProbeStreamState::SubsequentPass { manifest } => manifest, + ProbeStreamState::Failed(err) => return Err(DataFusionError::Shared(Arc::clone(err))), + _ => { + return Err(DataFusionError::Execution( + "Partitioned probe stream warm-up not finished".into(), + )) + } + }; + manifest.get_partition_row_count(partition) + } +} + +pub struct ProbePartitionManifest { + schema: SchemaRef, + slots: SpilledPartitions, +} + +impl ProbePartitionManifest { + fn new(schema: SchemaRef, spills: SpilledPartitions) -> Self { + Self { + schema, + slots: spills, + } + } + + fn get_partition_row_count(&self, partition: SpatialPartition) -> Result<usize> { + let spilled = self.slots.get_spilled_partition(partition)?; + Ok(spilled.num_rows()) + } + + fn stream_for(&mut self, partition: SpatialPartition) -> Result<SendableEvaluatedBatchStream> { + match partition { + SpatialPartition::Regular(0) => Err(DataFusionError::Execution( + "Partition 0 is only available during the first pass".into(), + )), + SpatialPartition::None => Err(DataFusionError::Execution( + "SpatialPartition::None should not request a probe stream".into(), + )), + SpatialPartition::Regular(_) => { + let spilled = self.slots.take_spilled_partition(partition)?; + Ok(Box::pin( + ExternalEvaluatedBatchStream::try_from_spill_files( + Arc::clone(&self.schema), + spilled.into_spill_files(), + )?, + )) + } + SpatialPartition::Multi => { + let spilled = self.slots.get_spilled_partition(partition)?; + Ok(Box::pin( + ExternalEvaluatedBatchStream::try_from_spill_files( + Arc::clone(&self.schema), + spilled.into_spill_files(), + )?, + )) + } Review Comment: It is designed to be consumed multiple times and being asymmetric with regular partitions. There is no problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
