LiaCastaneda commented on code in PR #22991:
URL: https://github.com/apache/datafusion/pull/22991#discussion_r3435811168
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -904,19 +904,51 @@ impl SortExec {
self.preserve_partitioning = preserve_partitioning;
Arc::make_mut(&mut self.cache).partitioning =
Self::output_partitioning_helper(&self.input,
self.preserve_partitioning);
+ if self.fetch.is_some() {
+ self.rebuild_filter_for_current_partitioning();
+ }
self
}
- /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
+ fn topk_emitter_count(&self) -> usize {
+ self.cache.output_partitioning().partition_count()
+ }
+
+ /// Build a new shared TopK dynamic filter wrapper for this `SortExec`.
fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
let children = self
.expr
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
- Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
- DynamicFilterPhysicalExpr::new(children, lit(true)),
- ))))
+ self.create_filter_with_expr(Arc::new(DynamicFilterPhysicalExpr::new(
+ children,
+ lit(true),
+ )))
+ }
+
+ fn create_filter_with_expr(
+ &self,
+ expr: Arc<DynamicFilterPhysicalExpr>,
+ ) -> Arc<RwLock<TopKDynamicFilters>> {
+ Arc::new(RwLock::new(
+ TopKDynamicFilters::new_with_topk_emitter_count(
+ expr,
+ self.topk_emitter_count(),
+ ),
+ ))
+ }
+
+ /// Rebuild the shared TopK filter wrapper after output partitioning
changes.
+ ///
+ /// The dynamic filter expression is preserved, but wrapper state such as
the
+ /// shared threshold and remaining emitter count is reset for the new
+ /// partitioning.
+ fn rebuild_filter_for_current_partitioning(&mut self) {
Review Comment:
nit: Is it harmless to reconstruct the filter from scratch? I guess this
isn't an issue if it's only called at planning time, but if this function were
called during execution we could end up discarding an active
`shared_threshold`.
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -2764,6 +2799,71 @@ mod tests {
Ok(())
}
+ async fn emit_sort_partition(
+ sort: &Arc<SortExec>,
+ partition: usize,
+ task_ctx: Arc<TaskContext>,
+ ) -> Result<()> {
+ let _batches: Vec<RecordBatch> =
+ sort.execute(partition, task_ctx)?.try_collect().await?;
+ Ok(())
+ }
+
+ fn assert_filter_still_waiting(filter: &Arc<DynamicFilterPhysicalExpr>) {
+ let dynamic_filter_expr: Arc<dyn PhysicalExpr> =
+ Arc::<DynamicFilterPhysicalExpr>::clone(filter);
+ assert!(
+ matches!(
+ DynamicFilterTracking::classify(&dynamic_filter_expr),
+ DynamicFilterTracking::Watching(_)
+ ),
+ "the shared filter should remain watchable until every partition
emits"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_preserved_topk_filter_waits_for_all_sort_partitions() ->
Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+ let partitions = vec![
+ vec![RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int32Array::from(vec![3, 1, 2]))],
+ )?],
+ vec![RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int32Array::from(vec![6, 4, 5]))],
+ )?],
+ ];
+ let input = TestMemoryExec::try_new_exec(&partitions,
Arc::clone(&schema), None)?;
+ let sort = SortExec::new(
+ [PhysicalSortExpr::new_default(Arc::new(Column::new("a",
0)))].into(),
+ input,
+ )
+ // `with_fetch` creates the TopK filter; preserving partitioning after
+ // that must rebuild it with one emitter per output partition.
+ .with_fetch(Some(2))
Review Comment:
something codex highlighted was that in `with_fetch`, if a dynamic filter
already exists it's cloned and reused, so it keeps the emitter count it was
built with, but not necessarily the current partition count. I'm not sure if
it's actually possible that there the emitter count and partition count can
differ, but it seems that every other place that rebuilds the sort calls
`rebuild_filter_for_current_partitioning`, should we call it there as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]