alamb commented on code in PR #11647: URL: https://github.com/apache/datafusion/pull/11647#discussion_r1691292992
########## datafusion/physical-plan/src/coalescer/mod.rs: ########## @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// TEMP +pub mod filter; + +use crate::coalescer::filter::{filter_array, FilterBuilder}; +use arrow::compute::concat_batches; +use arrow_array::{BooleanArray, RecordBatch, RecordBatchOptions}; +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use std::sync::Arc; + +/// Concatenate multiple record batches into larger batches +/// +/// See [`CoalesceBatchesExec`] for more details. +/// +/// Notes: +/// +/// 1. The output rows is the same order as the input rows +/// +/// 2. The output is a sequence of batches, with all but the last being at least +/// `target_batch_size` rows. +/// +/// 3. This structure also handles other optimizations such as a +/// combined filter/coalesce operation. +#[derive(Debug)] +pub struct BatchCoalescer { + /// The input schema + schema: SchemaRef, + /// Minimum number of rows for coalesces batches + target_batch_size: usize, + /// Buffered batches + buffer: Vec<RecordBatch>, + /// Buffered row count + buffered_rows: usize, +} + +impl BatchCoalescer { + /// Create a new BatchCoalescer that produces batches of at least `target_batch_size` rows + pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self { + Self { + schema, + target_batch_size, + buffer: vec![], + buffered_rows: 0, + } + } + + /// Return the schema of the output batches + pub fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Add a batch to the coalescer, returning a batch if the target batch size is reached + pub fn push_batch(&mut self, batch: RecordBatch) -> Result<Option<RecordBatch>> { + if batch.num_rows() >= self.target_batch_size && self.buffer.is_empty() { + return Ok(Some(batch)); + } + // discard empty batches + if batch.num_rows() == 0 { + return Ok(None); + } + // add to the buffered batches + self.buffered_rows += batch.num_rows(); + self.buffer.push(batch); + // check to see if we have enough batches yet + let batch = if self.buffered_rows >= self.target_batch_size { + // combine the batches and return + let batch = concat_batches(&self.schema, &self.buffer)?; + // reset buffer state + self.buffer.clear(); + self.buffered_rows = 0; + // return batch + Some(batch) + } else { + None + }; + Ok(batch) + } + + /// Push the rows in `batch` that matching the `predicate` (i. e. where the values are true). + pub fn push_batch_with_filter( + &mut self, + batch: &RecordBatch, + predicate: &BooleanArray, + ) -> Result<Option<RecordBatch>> { + let mut filter_builder = FilterBuilder::new(predicate); + if batch.num_columns() > 1 { + // Only optimize if filtering more than one column + filter_builder = filter_builder.optimize(); + } + let filter = filter_builder.build(); + + // TODO the idea is here is to iteratively build up the filtered batch + // and push it when it reaches the target size rather than buffering directly + let filtered_arrays = batch + .columns() + .iter() + .map(|a| filter_array(a, &filter)) + .collect::<Result<Vec<_>, _>>()?; + let options = RecordBatchOptions::default().with_row_count(Some(filter.count())); + let filtered_batch = Review Comment: The key goal is to avoid this batch materialization. I'll keep hacking on it tomorrow / later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
