zhuqi-lucas commented on PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#issuecomment-2812565082

   > So to change it in your diff (didn't change the documentation).
   > 
   > I would like to keep the original `StreamingMergeBuilder` case and the `if 
self.reservation.size() < self.sort_in_place_threshold_bytes` expression so 
that we only have the "avoid concat for non-sort columns" optimization in place 
and see if this improves on all sort queries.
   > 
   > ```diff
   > -        // If less than sort_in_place_threshold_bytes, concatenate and 
sort in place
   > -        if self.reservation.size() < self.sort_in_place_threshold_bytes {
   > -            // Concatenate memory batches together and sort
   > -            let batch = concat_batches(&self.schema, 
&self.in_mem_batches)?;
   > -            self.in_mem_batches.clear();
   > -            self.reservation
   > -                .try_resize(get_reserved_byte_for_record_batch(&batch))
   > -                .map_err(Self::err_with_oom_context)?;
   > -            let reservation = self.reservation.take();
   > -            return self.sort_batch_stream(batch, metrics, reservation);
   > +        let mut columns_by_expr: Vec<Vec<ArrayRef>> = vec![vec![]; 
self.expr.len()];
   > +        for batch in &self.in_mem_batches {
   > +            for (i, expr) in self.expr.iter().enumerate() {
   > +                let col = expr.evaluate_to_sort_column(batch)?.values;
   > +                columns_by_expr[i].push(col);
   > +            }
   >          }
   >  
   > -        let streams = std::mem::take(&mut self.in_mem_batches)
   > -            .into_iter()
   > -            .map(|batch| {
   > -                let metrics = self.metrics.baseline.intermediate();
   > -                let reservation = self
   > -                    .reservation
   > -                    .split(get_reserved_byte_for_record_batch(&batch));
   > -                let input = self.sort_batch_stream(batch, metrics, 
reservation)?;
   > -                Ok(spawn_buffered(input, 1))
   > -            })
   > -            .collect::<Result<_>>()?;
   > +        // For each sort expression, concatenate arrays from all batches 
into one global array
   > +        let mut sort_columns = Vec::with_capacity(self.expr.len());
   > +        for (arrays, expr) in 
columns_by_expr.into_iter().zip(self.expr.iter()) {
   > +            let array = concat(
   > +                &arrays
   > +                    .iter()
   > +                    .map(|a| a.as_ref())
   > +                    .collect::<Vec<&dyn Array>>(),
   > +            )?;
   > +            sort_columns.push(SortColumn {
   > +                values: array,
   > +                options: expr.options.into(),
   > +            });
   > +        }
   >  
   > -        let expressions: LexOrdering = 
self.expr.iter().cloned().collect();
   > +        // ===== Phase 2: Compute global sorted indices =====
   > +        // Use `lexsort_to_indices` to get global row indices in sorted 
order (as if all batches were concatenated)
   > +        let indices = lexsort_to_indices(&sort_columns, None)?;
   >  
   > -        StreamingMergeBuilder::new()
   > -            .with_streams(streams)
   > -            .with_schema(Arc::clone(&self.schema))
   > -            .with_expressions(expressions.as_ref())
   > -            .with_metrics(metrics)
   > -            .with_batch_size(self.batch_size)
   > -            .with_fetch(None)
   > -            .with_reservation(self.merge_reservation.new_empty())
   > -            .build()
   > +        // ===== Phase 3: Reorder each column using the global sorted 
indices =====
   > +        let num_columns = self.schema.fields().len();
   > +
   > +        let batch_indices: Vec<(usize, usize)> = self
   > +            .in_mem_batches
   > +            .iter()
   > +            .enumerate()
   > +            .map(|(batch_id, batch)| (0..batch.num_rows()).map(move |i| 
(batch_id, i)))
   > +            .flatten()
   > +            .collect();
   > +
   > +        // For each column:
   > +        // 1. Concatenate all batch arrays for this column (in the same 
order as assumed by `lexsort_to_indices`)
   > +        // 2. Use Arrow's `take` function to reorder the column by sorted 
indices
   > +        let interleave_indices: Vec<(usize, usize)> = indices
   > +            .values()
   > +            .iter()
   > +            .map(|x| batch_indices[*x as usize])
   > +            .collect();
   > +        // Build a RecordBatch from the sorted columns
   > +
   > +        let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
   > +
   > +        let sorted_batch =
   > +            interleave_record_batch(batches.as_ref(), 
&interleave_indices)?;
   > +        // Clear in-memory batches and update reservation
   > +        self.in_mem_batches.clear();
   > +        self.reservation
   > +            
.try_resize(get_reserved_byte_for_record_batch(&sorted_batch))?;
   > +        let reservation = self.reservation.take();
   > +
   > +        // ===== Phase 4: Construct the resulting stream =====
   > +        let stream = futures::stream::once(async move {
   > +            let _timer = metrics.elapsed_compute().timer();
   > +            metrics.record_output(sorted_batch.num_rows());
   > +            drop(reservation);
   > +            Ok(sorted_batch)
   > +        });
   > +
   > +        Ok(Box::pin(RecordBatchStreamAdapter::new(
   > +            self.schema.clone(),
   > +            stream,
   > +        )))
   > ```
   
   @Dandandan Wait, i think i misunderstand this comments, we only need to 
change the if self.reservation.size() < self.sort_in_place_threshold_bytes case 
for main branch, and keep others unchanged. I will try this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to