geoffreyclaude commented on code in PR #15563: URL: https://github.com/apache/datafusion/pull/15563#discussion_r2031164505
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -183,6 +208,86 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be worse than the + // current topK + self.attempt_early_completion(&batch)?; + + Ok(()) + } + + /// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full, + /// check if the computation can be finished early. + /// This is the case if the last row of the current batch is strictly worse than the worst row in the heap, + /// comparing only on the shared prefix columns. + fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()> { + // Early exit if the batch is empty as there is no last row to extract from it. + if batch.num_rows() == 0 { + return Ok(()); + } + + // prefix_row_converter is only `Some` if the input ordering has a common prefix with the TopK, + // so early exit if it is `None`. + let prefix_converter = match &self.common_prefix_converter { + Some(pc) => pc, + None => return Ok(()), + }; + + // Early exit if the heap is not full (`heap.max()` only returns `Some` if the heap is full). + let worst_topk_row = match self.heap.max() { + Some(row) => row, + None => return Ok(()), + }; Review Comment: ⛳ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org