NGA-TRAN commented on code in PR #15563: URL: https://github.com/apache/datafusion/pull/15563#discussion_r2031191811
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -681,4 +797,98 @@ mod tests { record_batch_store.unuse(0); assert_eq!(record_batch_store.batches_size, 0); } + + /// This test validates that the `try_finish` method marks the TopK operator as finished + /// when the prefix (on column "a") of the last row in the current batch is strictly greater + /// than the worst top‑k row. + /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". + #[tokio::test] + async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { + // Create a schema with two columns. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Float64, false), + ])); + + // Create sort expressions. + // Full sort: first by "a", then by "b". + let sort_expr_a = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + let sort_expr_b = PhysicalSortExpr { + expr: col("b", schema.as_ref())?, + options: SortOptions::default(), + }; + + // Input ordering uses only column "a" (a prefix of the full sort). + let input_ordering = LexOrdering::from(vec![sort_expr_a.clone()]); + let full_expr = LexOrdering::from(vec![sort_expr_a, sort_expr_b]); + + // Create a dummy runtime environment and metrics. + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a TopK instance with k = 3 and batch_size = 2. + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + input_ordering, + full_expr, + 3, + 2, + runtime, + &metrics, + )?; + + // Create the first batch with two columns: + // Column "a": [1, 1, 2], Column "b": [20.0, 15.0, 30.0]. + let array_a1: ArrayRef = + Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)])); + let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0])); + let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a1, array_b1])?; + + // Insert the first batch. + // At this point the heap is not yet “finished” because the prefix of the last row of the batch + // is not strictly greater than the prefix of the worst top‑k row (both being `2`). + topk.insert_batch(batch1)?; + assert!( + !topk.finished, + "Expected 'finished' to be false after the first batch." + ); + + // Create the second batch with two columns: + // Column "a": [2, 3], Column "b": [10.0, 20.0]. + let array_a2: ArrayRef = Arc::new(Int32Array::from(vec![Some(2), Some(3)])); + let array_b2: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0])); + let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a2, array_b2])?; + + // Insert the second batch. + // The last row in this batch has a prefix value of `3`, + // which is strictly greater than the worst top‑k row (with value `2`), + // so try_finish should mark the TopK as finished. + topk.insert_batch(batch2)?; + assert!( + topk.finished, + "Expected 'finished' to be true after the second batch." + ); + + // Verify the TopK correctly emits the top k rows from both batches + // (the value 10.0 for b is from the second batch). + let results: Vec<_> = topk.emit()?.try_collect().await?; + assert_batches_eq!( + &[ + "+---+------+", + "| a | b |", + "+---+------+", + "| 1 | 15.0 |", + "| 1 | 20.0 |", + "| 2 | 10.0 |", + "+---+------+", Review Comment: 👍 ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -681,4 +797,98 @@ mod tests { record_batch_store.unuse(0); assert_eq!(record_batch_store.batches_size, 0); } + + /// This test validates that the `try_finish` method marks the TopK operator as finished + /// when the prefix (on column "a") of the last row in the current batch is strictly greater + /// than the worst top‑k row. + /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". + #[tokio::test] + async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { + // Create a schema with two columns. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Float64, false), + ])); + + // Create sort expressions. + // Full sort: first by "a", then by "b". + let sort_expr_a = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + let sort_expr_b = PhysicalSortExpr { + expr: col("b", schema.as_ref())?, + options: SortOptions::default(), + }; + + // Input ordering uses only column "a" (a prefix of the full sort). + let input_ordering = LexOrdering::from(vec![sort_expr_a.clone()]); + let full_expr = LexOrdering::from(vec![sort_expr_a, sort_expr_b]); + + // Create a dummy runtime environment and metrics. + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a TopK instance with k = 3 and batch_size = 2. + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + input_ordering, + full_expr, + 3, + 2, + runtime, + &metrics, + )?; + + // Create the first batch with two columns: + // Column "a": [1, 1, 2], Column "b": [20.0, 15.0, 30.0]. + let array_a1: ArrayRef = + Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)])); + let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0])); Review Comment: Nice test to cover both general and corner cases ########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -681,4 +797,98 @@ mod tests { record_batch_store.unuse(0); assert_eq!(record_batch_store.batches_size, 0); } + + /// This test validates that the `try_finish` method marks the TopK operator as finished + /// when the prefix (on column "a") of the last row in the current batch is strictly greater + /// than the worst top‑k row. + /// The full sort expression is defined on both columns ("a", "b"), but the input ordering is only on "a". + #[tokio::test] + async fn test_try_finish_marks_finished_with_prefix() -> Result<()> { + // Create a schema with two columns. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Float64, false), + ])); + + // Create sort expressions. + // Full sort: first by "a", then by "b". + let sort_expr_a = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + let sort_expr_b = PhysicalSortExpr { + expr: col("b", schema.as_ref())?, + options: SortOptions::default(), + }; + + // Input ordering uses only column "a" (a prefix of the full sort). + let input_ordering = LexOrdering::from(vec![sort_expr_a.clone()]); + let full_expr = LexOrdering::from(vec![sort_expr_a, sort_expr_b]); + + // Create a dummy runtime environment and metrics. + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a TopK instance with k = 3 and batch_size = 2. + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + input_ordering, + full_expr, + 3, + 2, + runtime, + &metrics, + )?; + + // Create the first batch with two columns: + // Column "a": [1, 1, 2], Column "b": [20.0, 15.0, 30.0]. + let array_a1: ArrayRef = + Arc::new(Int32Array::from(vec![Some(1), Some(1), Some(2)])); + let array_b1: ArrayRef = Arc::new(Float64Array::from(vec![20.0, 15.0, 30.0])); + let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array_a1, array_b1])?; + + // Insert the first batch. + // At this point the heap is not yet “finished” because the prefix of the last row of the batch + // is not strictly greater than the prefix of the worst top‑k row (both being `2`). + topk.insert_batch(batch1)?; + assert!( + !topk.finished, + "Expected 'finished' to be false after the first batch." + ); + + // Create the second batch with two columns: + // Column "a": [2, 3], Column "b": [10.0, 20.0]. + let array_a2: ArrayRef = Arc::new(Int32Array::from(vec![Some(2), Some(3)])); + let array_b2: ArrayRef = Arc::new(Float64Array::from(vec![10.0, 20.0])); Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org