geoffreyclaude commented on code in PR #22991:
URL: https://github.com/apache/datafusion/pull/22991#discussion_r3437038200
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -2764,6 +2799,71 @@ mod tests {
Ok(())
}
+ async fn emit_sort_partition(
+ sort: &Arc<SortExec>,
+ partition: usize,
+ task_ctx: Arc<TaskContext>,
+ ) -> Result<()> {
+ let _batches: Vec<RecordBatch> =
+ sort.execute(partition, task_ctx)?.try_collect().await?;
+ Ok(())
+ }
+
+ fn assert_filter_still_waiting(filter: &Arc<DynamicFilterPhysicalExpr>) {
+ let dynamic_filter_expr: Arc<dyn PhysicalExpr> =
+ Arc::<DynamicFilterPhysicalExpr>::clone(filter);
+ assert!(
+ matches!(
+ DynamicFilterTracking::classify(&dynamic_filter_expr),
+ DynamicFilterTracking::Watching(_)
+ ),
+ "the shared filter should remain watchable until every partition
emits"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_preserved_topk_filter_waits_for_all_sort_partitions() ->
Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let partitions = vec![
+ vec![RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int32Array::from(vec![3, 1, 2]))],
+ )?],
+ vec![RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int32Array::from(vec![6, 4, 5]))],
+ )?],
+ ];
+ let input = TestMemoryExec::try_new_exec(&partitions,
Arc::clone(&schema), None)?;
+ let sort = SortExec::new(
+ [PhysicalSortExpr::new_default(Arc::new(Column::new("a",
0)))].into(),
+ input,
+ )
+ // `with_fetch` creates the TopK filter; preserving partitioning after
+ // that must rebuild it with one emitter per output partition.
+ .with_fetch(Some(2))
Review Comment:
Good catch. I changed with_fetch in 4c32f8a0c so an existing dynamic filter
keeps its expression, but the TopKDynamicFilters wrapper is rebuilt against the
current output partitioning instead of being cloned with possibly stale emitter
state. I also added test_with_fetch_rebuilds_existing_topk_filter for the
ordering where the filter exists before fetch is set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]