geoffreyclaude commented on code in PR #22991:
URL: https://github.com/apache/datafusion/pull/22991#discussion_r3437038200
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -2764,6 +2799,71 @@ mod tests {
Ok(())
}
+ async fn emit_sort_partition(
+ sort: &Arc<SortExec>,
+ partition: usize,
+ task_ctx: Arc<TaskContext>,
+ ) -> Result<()> {
+ let _batches: Vec<RecordBatch> =
+ sort.execute(partition, task_ctx)?.try_collect().await?;
+ Ok(())
+ }
+
+ fn assert_filter_still_waiting(filter: &Arc<DynamicFilterPhysicalExpr>) {
+ let dynamic_filter_expr: Arc<dyn PhysicalExpr> =
+ Arc::<DynamicFilterPhysicalExpr>::clone(filter);
+ assert!(
+ matches!(
+ DynamicFilterTracking::classify(&dynamic_filter_expr),
+ DynamicFilterTracking::Watching(_)
+ ),
+ "the shared filter should remain watchable until every partition
emits"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_preserved_topk_filter_waits_for_all_sort_partitions() ->
Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let partitions = vec![
+ vec![RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int32Array::from(vec![3, 1, 2]))],
+ )?],
+ vec![RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int32Array::from(vec![6, 4, 5]))],
+ )?],
+ ];
+ let input = TestMemoryExec::try_new_exec(&partitions,
Arc::clone(&schema), None)?;
+ let sort = SortExec::new(
+ [PhysicalSortExpr::new_default(Arc::new(Column::new("a",
0)))].into(),
+ input,
+ )
+ // `with_fetch` creates the TopK filter; preserving partitioning after
+ // that must rebuild it with one emitter per output partition.
+ .with_fetch(Some(2))
Review Comment:
Good catch. I changed `with_fetch` in 4c32f8a0c so an existing dynamic
filter keeps its expression, but the `TopKDynamicFilters` wrapper is rebuilt
against the current output partitioning instead of being cloned with possibly
stale emitter state. I also added
`test_with_fetch_rebuilds_existing_topk_filter` for the ordering where the
filter exists before fetch is set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]