gabotechs commented on code in PR #15563: URL: https://github.com/apache/datafusion/pull/15563#discussion_r2030583037
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -197,6 +302,9 @@ impl TopK { row_converter: _, scratch_rows: _, mut heap, + common_prefix_converter: _, + common_sort_prefix: _, + finished: _, Review Comment: 🤔 I wonder why the spread operator was just not used here on the first place. For consistency it might make more sense to leave it as is. ```suggestion .. ``` ########## datafusion/physical-expr/src/equivalence/properties/mod.rs: ########## @@ -546,22 +546,23 @@ impl EquivalenceProperties { self.ordering_satisfy_requirement(&sort_requirements) } - /// Checks whether the given sort requirements are satisfied by any of the - /// existing orderings. - pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool { - let mut eq_properties = self.clone(); - // First, standardize the given requirement: - let normalized_reqs = eq_properties.normalize_sort_requirements(reqs); - + /// Returns the number of consecutive requirements (starting from the left) + /// that are satisfied by the plan ordering. + fn compute_matching_prefix_length(&self, normalized_reqs: &LexRequirement) -> usize { Review Comment: I see that through this PR multiple terms are used for referring to the same thing: "matching_prefix", "sort_prefix", "common_prefix", "common_sort_prefix". As it's not obvious at first sight that all are referring to the same thing, it might help readers to use the same term for everywhere: `compute_common_sort_prefix_length`, `extract_common_sort_prefix`, `common_sort_prefix_converter`, `compute_common_sort_prefix`, `common_sort_prefix=[non_nullable_col@1 ASC]` ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -183,6 +208,86 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be worse than the + // current topK + self.attempt_early_completion(&batch)?; + + Ok(()) + } + + /// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full, + /// check if the computation can be finished early. + /// This is the case if the last row of the current batch is strictly worse than the worst row in the heap, + /// comparing only on the shared prefix columns. + fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()> { + // Early exit if the batch is empty as there is no last row to extract from it. + if batch.num_rows() == 0 { + return Ok(()); + } + + // prefix_row_converter is only `Some` if the input ordering has a common prefix with the TopK, + // so early exit if it is `None`. + let prefix_converter = match &self.common_prefix_converter { + Some(pc) => pc, + None => return Ok(()), + }; + + // Early exit if the heap is not full (`heap.max()` only returns `Some` if the heap is full). + let worst_topk_row = match self.heap.max() { + Some(row) => row, + None => return Ok(()), + }; Review Comment: Nit: you should be able to save some keystrokes with let-else statements here: ```suggestion // prefix_row_converter is only `Some` if the input ordering has a common prefix with the TopK, // so early exit if it is `None`. let Some(prefix_converter) = &self.common_prefix_converter else { return Ok(()) }; // Early exit if the heap is not full (`heap.max()` only returns `Some` if the heap is full). let Some(worst_topk_row) = self.heap.max() else { return Ok(()) }; ``` ########## datafusion/physical-expr/src/equivalence/properties/mod.rs: ########## @@ -575,10 +576,35 @@ impl EquivalenceProperties { // From the analysis above, we know that `[a ASC]` is satisfied. Then, // we add column `a` as constant to the algorithm state. This enables us // to deduce that `(b + c) ASC` is satisfied, given `a` is constant. - eq_properties = eq_properties - .with_constants(std::iter::once(ConstExpr::from(normalized_req.expr))); + eq_properties = eq_properties.with_constants(std::iter::once( + ConstExpr::from(Arc::clone(&normalized_req.expr)), + )); } - true + + // All requirements are satisfied. + normalized_reqs.len() + } + + /// Determines the longest prefix of `reqs` that is satisfied by the existing ordering. + /// Returns that prefix as a new `LexRequirement`, and a boolean indicating if all the requirements are satisfied. + pub fn extract_matching_prefix( + &self, + reqs: &LexRequirement, + ) -> (LexRequirement, bool) { + // First, standardize the given requirement: + let normalized_reqs = self.normalize_sort_requirements(reqs); Review Comment: I imagine that this will normalize cases where columns get aliased, duplicate sort expressions, etc. But would this normalize cases where expressions that do not alter the order are applied to columns? For example, would `[int_column+1 DESC, int_column DESC]` be normalized to `[int_column DESC]`? if that's the case, maybe adding a sql logic test for this? ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -183,6 +208,86 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be worse than the + // current topK + self.attempt_early_completion(&batch)?; + + Ok(()) + } + + /// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full, + /// check if the computation can be finished early. + /// This is the case if the last row of the current batch is strictly worse than the worst row in the heap, + /// comparing only on the shared prefix columns. + fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()> { + // Early exit if the batch is empty as there is no last row to extract from it. + if batch.num_rows() == 0 { + return Ok(()); + } + + // prefix_row_converter is only `Some` if the input ordering has a common prefix with the TopK, + // so early exit if it is `None`. + let prefix_converter = match &self.common_prefix_converter { + Some(pc) => pc, + None => return Ok(()), + }; + + // Early exit if the heap is not full (`heap.max()` only returns `Some` if the heap is full). + let worst_topk_row = match self.heap.max() { + Some(row) => row, + None => return Ok(()), + }; + + // Evaluate the prefix for the last row of the current batch. + let last_row_idx = batch.num_rows() - 1; + let mut batch_prefix_scratch = prefix_converter.empty_rows(1, 20); Review Comment: 1 here is the number of rows, and 20 is the initial capacity in bytes for storing the raw arrow data that represents the rows (just 1 in this case). This does not mean that only 20 bytes of data can be stored, IIUC this is just the amount of data that can be stored without dynamically resizing the underlaying buffer, but upon storing more data, the underlaying buffer will just get resized. This might me worth a comment. ########## datafusion/sqllogictest/test_files/topk.slt: ########## @@ -233,3 +233,104 @@ d 1 -98 y7C453hRWd4E7ImjNDWlpexB8nUqjh y7C453hRWd4E7ImjNDWlpexB8nUqjh e 2 52 xipQ93429ksjNcXPX5326VSg1xJZcW xipQ93429ksjNcXPX5326VSg1xJZcW d 1 -72 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS wwXqSGKLyBQyPkonlzBNYUJTCo4LRS a 1 -5 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs + +##################################### +## Test TopK with Partially Sorted Inputs +##################################### + + +# Create an external table where data is pre-sorted by (column1 DESC, column2 ASC) only. +statement ok +CREATE EXTERNAL TABLE partial_sorted ( + column1 VARCHAR, + column2 INT, + column3 INT +) +STORED AS parquet +LOCATION 'test_files/scratch/topk/partial_sorted/1.parquet' +WITH ORDER (column1 DESC, column2 ASC); + +# Insert test data into the external table. +query I +COPY (VALUES + ('A', 12, 100), + ('A', 8, 50), + ('B', 9, 70), + ('B', 10, 80), + ('C', 7, 60), + ('C', 11, 90)) +TO 'test_files/scratch/topk/partial_sorted/1.parquet'; +---- +6 + +# Run a TopK query that orders by all columns. +# Although the table is only guaranteed to be sorted by (column1 DESC, column2 ASC), +# DataFusion should use the common prefix optimization +# and return the correct top 3 rows when ordering by all columns. +query TII +select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; Review Comment: How about adding some tests that try to mess around with `NULLS LAST` and `NULLS FIRST`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org