adriangb commented on code in PR #15770: URL: https://github.com/apache/datafusion/pull/15770#discussion_r2083516211
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -570,6 +680,47 @@ impl TopKHeap { + self.store.size() + self.owned_bytes } + + fn get_threshold_values( + &self, + sort_exprs: &[PhysicalSortExpr], + ) -> Result<Option<Vec<ScalarValue>>> { + // If the heap doesn't have k elements yet, we can't create thresholds + let max_row = match self.max() { + Some(row) => row, + None => return Ok(None), + }; + + // Get the batch that contains the max row + let batch_entry = match self.store.get(max_row.batch_id) { + Some(entry) => entry, + None => return internal_err!("Invalid batch ID in TopKRow"), + }; + + // Extract threshold values for each sort expression + let mut scalar_values = Vec::with_capacity(sort_exprs.len()); + for sort_expr in sort_exprs { Review Comment: Maybe? We should measure overhead / execution time for a large case first. If it’s under a couple hundred ms it’s probably not worth the overhead. If we do parallelize I assume we can just pipe the asyncness down to here and use Tokio tasks? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org