NGA-TRAN commented on code in PR #15563: URL: https://github.com/apache/datafusion/pull/15563#discussion_r2029861706
########## datafusion/physical-expr/src/equivalence/properties/mod.rs: ########## @@ -575,10 +576,35 @@ impl EquivalenceProperties { // From the analysis above, we know that `[a ASC]` is satisfied. Then, // we add column `a` as constant to the algorithm state. This enables us // to deduce that `(b + c) ASC` is satisfied, given `a` is constant. - eq_properties = eq_properties - .with_constants(std::iter::once(ConstExpr::from(normalized_req.expr))); + eq_properties = eq_properties.with_constants(std::iter::once( + ConstExpr::from(Arc::clone(&normalized_req.expr)), + )); } - true + + // All requirements are satisfied. + normalized_reqs.len() + } + + /// Determines the longest prefix of `reqs` that is satisfied by the existing ordering. + /// Returns that prefix as a new `LexRequirement`, and a boolean indicating if all the requirements are satisfied. + pub fn extract_matching_prefix( + &self, + reqs: &LexRequirement, + ) -> (LexRequirement, bool) { + // First, standardize the given requirement: + let normalized_reqs = self.normalize_sort_requirements(reqs); + + let prefix_len = self.compute_matching_prefix_length(&normalized_reqs); + ( + LexRequirement::new(normalized_reqs[..prefix_len].to_vec()), + prefix_len == normalized_reqs.len(), + ) + } + + /// Checks whether the given sort requirements are satisfied by any of the + /// existing orderings. + pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool { + self.extract_matching_prefix(reqs).1 Review Comment: I have review carefully this mod.rs file. Very nice work to refactor and add functions to get the sorted prefix ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -183,6 +208,86 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be worse than the + // current topK + self.attempt_early_completion(&batch)?; + + Ok(()) + } + + /// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full, + /// check if the computation can be finished early. + /// This is the case if the last row of the current batch is strictly worse than the worst row in the heap, + /// comparing only on the shared prefix columns. + fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()> { + // Early exit if the batch is empty as there is no last row to extract from it. + if batch.num_rows() == 0 { + return Ok(()); + } + + // prefix_row_converter is only `Some` if the input ordering has a common prefix with the TopK, + // so early exit if it is `None`. + let prefix_converter = match &self.common_prefix_converter { + Some(pc) => pc, + None => return Ok(()), + }; + + // Early exit if the heap is not full (`heap.max()` only returns `Some` if the heap is full). + let worst_topk_row = match self.heap.max() { + Some(row) => row, + None => return Ok(()), + }; + + // Evaluate the prefix for the last row of the current batch. + let last_row_idx = batch.num_rows() - 1; + let mut batch_prefix_scratch = prefix_converter.empty_rows(1, 20); + self.compute_common_prefix(batch, last_row_idx, &mut batch_prefix_scratch)?; + + // Retrieve the "worst" row from the heap. + let store_entry = self + .heap + .store + .get(worst_topk_row.batch_id) + .ok_or(internal_datafusion_err!("Invalid batch id in topK heap"))?; + let worst_batch = &store_entry.batch; + let mut heap_prefix_scratch = prefix_converter.empty_rows(1, 20); + self.compute_common_prefix( + worst_batch, + worst_topk_row.index, + &mut heap_prefix_scratch, + )?; + + // If the last row's prefix is strictly greater than the worst prefix, mark as finished. + if batch_prefix_scratch.row(0).as_ref() > heap_prefix_scratch.row(0).as_ref() { + self.finished = true; Review Comment: Nice ########## datafusion/sqllogictest/test_files/topk.slt: ########## @@ -233,3 +233,104 @@ d 1 -98 y7C453hRWd4E7ImjNDWlpexB8nUqjh y7C453hRWd4E7ImjNDWlpexB8nUqjh e 2 52 xipQ93429ksjNcXPX5326VSg1xJZcW xipQ93429ksjNcXPX5326VSg1xJZcW d 1 -72 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS wwXqSGKLyBQyPkonlzBNYUJTCo4LRS a 1 -5 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs + +##################################### +## Test TopK with Partially Sorted Inputs +##################################### + + +# Create an external table where data is pre-sorted by (column1 DESC, column2 ASC) only. +statement ok +CREATE EXTERNAL TABLE partial_sorted ( + column1 VARCHAR, + column2 INT, + column3 INT +) +STORED AS parquet +LOCATION 'test_files/scratch/topk/partial_sorted/1.parquet' +WITH ORDER (column1 DESC, column2 ASC); + +# Insert test data into the external table. +query I +COPY (VALUES + ('A', 12, 100), + ('A', 8, 50), + ('B', 9, 70), + ('B', 10, 80), + ('C', 7, 60), + ('C', 11, 90)) +TO 'test_files/scratch/topk/partial_sorted/1.parquet'; +---- +6 + +# Run a TopK query that orders by all columns. +# Although the table is only guaranteed to be sorted by (column1 DESC, column2 ASC), +# DataFusion should use the common prefix optimization +# and return the correct top 3 rows when ordering by all columns. +query TII +select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; +---- +C 7 60 +C 11 90 +B 9 70 + +## explain_physical_plan_only +statement ok +set datafusion.explain.physical_plan_only = true + +# Verify that the physical plan includes the sort prefix. +# The output should display a "sort_prefix" in the SortExec node. +query TT +explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 ASC NULLS LAST, column3@2 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC, column2@1 ASC NULLS LAST] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + + +# Explain variations of the above query with different orderings, and different sort prefixes. +# The "sort_prefix" in the SortExec node should only be present if the TopK's ordering starts with either (column1 DESC, column2 ASC) or just (column1 DESC). +query TT +explain select column1, column2, column3 from partial_sorted order by column3 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column3@2 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + +query TT +explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + +query TT +explain select column1, column2, column3 from partial_sorted order by column1 asc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + +query TT +explain select column1, column2, column3 from partial_sorted order by column2 asc, column1 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column2@1 ASC NULLS LAST, column1@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + + +# Verify that the sort prefix is correctly computed on the normalized ordering (removing redundant aliased columns for example). +query TT +explain select column1, column2, column3, column1 as column4, column2 as column5 from partial_sorted order by column1 desc, column4 desc, column2 asc, column5 asc, column3 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column4@3 DESC, column2@1 ASC NULLS LAST, column5@4 ASC NULLS LAST, column3@2 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC, column2@1 ASC NULLS LAST] +02)--ProjectionExec: expr=[column1@0 as column1, column2@1 as column2, column3@2 as column3, column1@0 as column4, column2@1 as column5] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + +# Cleanup +statement ok +DROP TABLE partial_sorted; + +statement ok +set datafusion.explain.physical_plan_only = false Review Comment: Very nice explain tests. 🚀 ########## datafusion/core/tests/physical_optimizer/enforce_sorting.rs: ########## @@ -1652,7 +1652,7 @@ async fn test_remove_unnecessary_sort7() -> Result<()> { ) as Arc<dyn ExecutionPlan>; let expected_input = [ - "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]", + "SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false], sort_prefix=[non_nullable_col@1 ASC]", Review Comment: Very nice and clear addition to the explain ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -1114,11 +1130,14 @@ impl SortExec { let output_partitioning = Self::output_partitioning_helper(input, preserve_partitioning); - PlanProperties::new( - eq_properties, - output_partitioning, - emission_type, - boundedness, + ( + PlanProperties::new( + eq_properties, + output_partitioning, + emission_type, + boundedness, + ), + LexOrdering::from(sort_prefix), Review Comment: Very nice addition logic in this `compute_properties` function ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -681,4 +793,98 @@ mod tests { record_batch_store.unuse(0); assert_eq!(record_batch_store.batches_size, 0); } + + /// This test validates that the `try_finish` method marks the TopK operator as finished + /// when the prefix (on column "a") of the last row in the current batch is strictly greater + /// than the worst top‑k row. + /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". + #[tokio::test] + async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { + // Create a schema with two columns. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Float64, false), + ])); + + // Create sort expressions. + // Full sort: first by "a", then by "b". + let sort_expr_a = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + let sort_expr_b = PhysicalSortExpr { + expr: col("b", schema.as_ref())?, + options: SortOptions::default(), + }; + + // Input ordering uses only column "a" (a prefix of the full sort). + let input_ordering = LexOrdering::from(vec![sort_expr_a.clone()]); + let full_expr = LexOrdering::from(vec![sort_expr_a, sort_expr_b]); + + // Create a dummy runtime environment and metrics. + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a TopK instance with k = 3 and batch_size = 2. + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + input_ordering, + full_expr, + 3, + 2, + runtime, + &metrics, + )?; + + // Create the first batch with two columns: + // Column "a": [1, 1, 2], Column "b": [10.0, 20.0, 10.0]. Review Comment: To further test the output data sorted in both a and b, I suggest to make data of column b like this: `"b": [20.0, 10.0, 30.0]. ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -108,17 +131,7 @@ impl TopK { let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) .register(&runtime.memory_pool); - let expr: Arc<[PhysicalSortExpr]> = expr.into(); - - let sort_fields: Vec<_> = expr - .iter() - .map(|e| { - Ok(SortField::new_with_options( - e.expr.data_type(&schema)?, - e.options, - )) - }) - .collect::<Result<_>>()?; + let sort_fields: Vec<_> = build_sort_fields(&expr, &schema)?; Review Comment: Nice refactor ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -183,6 +208,86 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be worse than the + // current topK + self.attempt_early_completion(&batch)?; + + Ok(()) + } + + /// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full, + /// check if the computation can be finished early. + /// This is the case if the last row of the current batch is strictly worse than the worst row in the heap, + /// comparing only on the shared prefix columns. + fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()> { + // Early exit if the batch is empty as there is no last row to extract from it. + if batch.num_rows() == 0 { + return Ok(()); + } + + // prefix_row_converter is only `Some` if the input ordering has a common prefix with the TopK, + // so early exit if it is `None`. + let prefix_converter = match &self.common_prefix_converter { + Some(pc) => pc, + None => return Ok(()), + }; + + // Early exit if the heap is not full (`heap.max()` only returns `Some` if the heap is full). + let worst_topk_row = match self.heap.max() { + Some(row) => row, + None => return Ok(()), + }; + + // Evaluate the prefix for the last row of the current batch. + let last_row_idx = batch.num_rows() - 1; + let mut batch_prefix_scratch = prefix_converter.empty_rows(1, 20); + self.compute_common_prefix(batch, last_row_idx, &mut batch_prefix_scratch)?; + + // Retrieve the "worst" row from the heap. + let store_entry = self + .heap + .store + .get(worst_topk_row.batch_id) + .ok_or(internal_datafusion_err!("Invalid batch id in topK heap"))?; + let worst_batch = &store_entry.batch; + let mut heap_prefix_scratch = prefix_converter.empty_rows(1, 20); Review Comment: Same here ########## datafusion/sqllogictest/test_files/topk.slt: ########## @@ -233,3 +233,104 @@ d 1 -98 y7C453hRWd4E7ImjNDWlpexB8nUqjh y7C453hRWd4E7ImjNDWlpexB8nUqjh e 2 52 xipQ93429ksjNcXPX5326VSg1xJZcW xipQ93429ksjNcXPX5326VSg1xJZcW d 1 -72 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS wwXqSGKLyBQyPkonlzBNYUJTCo4LRS a 1 -5 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs + +##################################### +## Test TopK with Partially Sorted Inputs +##################################### + + +# Create an external table where data is pre-sorted by (column1 DESC, column2 ASC) only. +statement ok +CREATE EXTERNAL TABLE partial_sorted ( + column1 VARCHAR, + column2 INT, + column3 INT +) +STORED AS parquet +LOCATION 'test_files/scratch/topk/partial_sorted/1.parquet' +WITH ORDER (column1 DESC, column2 ASC); + +# Insert test data into the external table. +query I +COPY (VALUES + ('A', 12, 100), + ('A', 8, 50), + ('B', 9, 70), + ('B', 10, 80), + ('C', 7, 60), + ('C', 11, 90)) +TO 'test_files/scratch/topk/partial_sorted/1.parquet'; +---- +6 + +# Run a TopK query that orders by all columns. +# Although the table is only guaranteed to be sorted by (column1 DESC, column2 ASC), +# DataFusion should use the common prefix optimization +# and return the correct top 3 rows when ordering by all columns. +query TII +select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; +---- +C 7 60 +C 11 90 +B 9 70 + +## explain_physical_plan_only +statement ok +set datafusion.explain.physical_plan_only = true + +# Verify that the physical plan includes the sort prefix. +# The output should display a "sort_prefix" in the SortExec node. +query TT +explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 ASC NULLS LAST, column3@2 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC, column2@1 ASC NULLS LAST] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + + +# Explain variations of the above query with different orderings, and different sort prefixes. +# The "sort_prefix" in the SortExec node should only be present if the TopK's ordering starts with either (column1 DESC, column2 ASC) or just (column1 DESC). +query TT +explain select column1, column2, column3 from partial_sorted order by column3 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column3@2 DESC], preserve_partitioning=[false] Review Comment: 👍 no sort_prefix ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -183,6 +208,86 @@ impl TopK { // update memory reservation self.reservation.try_resize(self.size())?; + + // flag the topK as finished if we know that all + // subsequent batches are guaranteed to be worse than the + // current topK + self.attempt_early_completion(&batch)?; + + Ok(()) + } + + /// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full, + /// check if the computation can be finished early. + /// This is the case if the last row of the current batch is strictly worse than the worst row in the heap, + /// comparing only on the shared prefix columns. + fn attempt_early_completion(&mut self, batch: &RecordBatch) -> Result<()> { + // Early exit if the batch is empty as there is no last row to extract from it. + if batch.num_rows() == 0 { + return Ok(()); + } + + // prefix_row_converter is only `Some` if the input ordering has a common prefix with the TopK, + // so early exit if it is `None`. + let prefix_converter = match &self.common_prefix_converter { + Some(pc) => pc, + None => return Ok(()), + }; + + // Early exit if the heap is not full (`heap.max()` only returns `Some` if the heap is full). + let worst_topk_row = match self.heap.max() { + Some(row) => row, + None => return Ok(()), + }; + + // Evaluate the prefix for the last row of the current batch. + let last_row_idx = batch.num_rows() - 1; + let mut batch_prefix_scratch = prefix_converter.empty_rows(1, 20); Review Comment: Can you add comment explaining why 1 and 20? Can they be defined and assigned to constants or, at least variables, and use the variables here? ########## datafusion/sqllogictest/test_files/topk.slt: ########## @@ -233,3 +233,104 @@ d 1 -98 y7C453hRWd4E7ImjNDWlpexB8nUqjh y7C453hRWd4E7ImjNDWlpexB8nUqjh e 2 52 xipQ93429ksjNcXPX5326VSg1xJZcW xipQ93429ksjNcXPX5326VSg1xJZcW d 1 -72 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS wwXqSGKLyBQyPkonlzBNYUJTCo4LRS a 1 -5 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs + +##################################### +## Test TopK with Partially Sorted Inputs +##################################### + + +# Create an external table where data is pre-sorted by (column1 DESC, column2 ASC) only. +statement ok +CREATE EXTERNAL TABLE partial_sorted ( + column1 VARCHAR, + column2 INT, + column3 INT +) +STORED AS parquet +LOCATION 'test_files/scratch/topk/partial_sorted/1.parquet' +WITH ORDER (column1 DESC, column2 ASC); + +# Insert test data into the external table. +query I +COPY (VALUES + ('A', 12, 100), + ('A', 8, 50), + ('B', 9, 70), + ('B', 10, 80), + ('C', 7, 60), + ('C', 11, 90)) +TO 'test_files/scratch/topk/partial_sorted/1.parquet'; +---- +6 + +# Run a TopK query that orders by all columns. +# Although the table is only guaranteed to be sorted by (column1 DESC, column2 ASC), +# DataFusion should use the common prefix optimization +# and return the correct top 3 rows when ordering by all columns. +query TII +select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; +---- +C 7 60 +C 11 90 +B 9 70 + +## explain_physical_plan_only +statement ok +set datafusion.explain.physical_plan_only = true + +# Verify that the physical plan includes the sort prefix. +# The output should display a "sort_prefix" in the SortExec node. +query TT +explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 ASC NULLS LAST, column3@2 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC, column2@1 ASC NULLS LAST] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + + +# Explain variations of the above query with different orderings, and different sort prefixes. +# The "sort_prefix" in the SortExec node should only be present if the TopK's ordering starts with either (column1 DESC, column2 ASC) or just (column1 DESC). +query TT +explain select column1, column2, column3 from partial_sorted order by column3 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column3@2 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[column1, column2, column3], output_ordering=[column1@0 DESC, column2@1 ASC NULLS LAST], file_type=parquet + +query TT +explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC] Review Comment: 👍 one-column sort prefix ########## datafusion/sqllogictest/test_files/topk.slt: ########## @@ -233,3 +233,104 @@ d 1 -98 y7C453hRWd4E7ImjNDWlpexB8nUqjh y7C453hRWd4E7ImjNDWlpexB8nUqjh e 2 52 xipQ93429ksjNcXPX5326VSg1xJZcW xipQ93429ksjNcXPX5326VSg1xJZcW d 1 -72 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS wwXqSGKLyBQyPkonlzBNYUJTCo4LRS a 1 -5 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs + +##################################### +## Test TopK with Partially Sorted Inputs +##################################### + + +# Create an external table where data is pre-sorted by (column1 DESC, column2 ASC) only. +statement ok +CREATE EXTERNAL TABLE partial_sorted ( + column1 VARCHAR, + column2 INT, + column3 INT +) +STORED AS parquet +LOCATION 'test_files/scratch/topk/partial_sorted/1.parquet' +WITH ORDER (column1 DESC, column2 ASC); + +# Insert test data into the external table. +query I +COPY (VALUES + ('A', 12, 100), + ('A', 8, 50), + ('B', 9, 70), + ('B', 10, 80), + ('C', 7, 60), + ('C', 11, 90)) +TO 'test_files/scratch/topk/partial_sorted/1.parquet'; +---- +6 + +# Run a TopK query that orders by all columns. +# Although the table is only guaranteed to be sorted by (column1 DESC, column2 ASC), +# DataFusion should use the common prefix optimization +# and return the correct top 3 rows when ordering by all columns. +query TII +select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; +---- +C 7 60 +C 11 90 +B 9 70 + +## explain_physical_plan_only +statement ok +set datafusion.explain.physical_plan_only = true + +# Verify that the physical plan includes the sort prefix. +# The output should display a "sort_prefix" in the SortExec node. +query TT +explain select column1, column2, column3 from partial_sorted order by column1 desc, column2 asc, column3 desc limit 3; +---- +physical_plan +01)SortExec: TopK(fetch=3), expr=[column1@0 DESC, column2@1 ASC NULLS LAST, column3@2 DESC], preserve_partitioning=[false], sort_prefix=[column1@0 DESC, column2@1 ASC NULLS LAST] Review Comment: Nicely 2-column sort_prefix ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -681,4 +793,98 @@ mod tests { record_batch_store.unuse(0); assert_eq!(record_batch_store.batches_size, 0); } + + /// This test validates that the `try_finish` method marks the TopK operator as finished + /// when the prefix (on column "a") of the last row in the current batch is strictly greater + /// than the worst top‑k row. + /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". + #[tokio::test] + async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { + // Create a schema with two columns. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Float64, false), + ])); + + // Create sort expressions. + // Full sort: first by "a", then by "b". + let sort_expr_a = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + let sort_expr_b = PhysicalSortExpr { + expr: col("b", schema.as_ref())?, + options: SortOptions::default(), + }; + + // Input ordering uses only column "a" (a prefix of the full sort). + let input_ordering = LexOrdering::from(vec![sort_expr_a.clone()]); + let full_expr = LexOrdering::from(vec![sort_expr_a, sort_expr_b]); + + // Create a dummy runtime environment and metrics. + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a TopK instance with k = 3 and batch_size = 2. + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + input_ordering, + full_expr, + 3, + 2, + runtime, + &metrics, + )?; + + // Create the first batch with two columns: + // Column "a": [1, 1, 2], Column "b": [10.0, 20.0, 10.0]. + let array_a1: ArrayRef = + Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(3)])); + let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])); + let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a1, array_b1])?; + + // Insert the first batch. + // At this point the heap is not yet “finished” because the prefix (last row value in column "a" is 3) + // is not strictly greater than the worst top‑k row. + topk.insert_batch(batch1)?; + assert!( + !topk.finished, + "Expected 'finished' to be false after the first batch." + ); + + // Create the second batch with two columns: + // Column "a": [2, 3], Column "b": [30.0, 10.0]. Review Comment: Make `"b": [20.0, 10.0]`. The purpose of the change and the above change is to have data still return top 3 rows but the third row will come from batch2. Also 2 first rows of batch1 will also get sorted by on a and b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org