geoffreyclaude commented on code in PR #22991:
URL: https://github.com/apache/datafusion/pull/22991#discussion_r3437036972
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -904,19 +904,51 @@ impl SortExec {
self.preserve_partitioning = preserve_partitioning;
Arc::make_mut(&mut self.cache).partitioning =
Self::output_partitioning_helper(&self.input,
self.preserve_partitioning);
+ if self.fetch.is_some() {
+ self.rebuild_filter_for_current_partitioning();
+ }
self
}
- /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
+ fn topk_emitter_count(&self) -> usize {
+ self.cache.output_partitioning().partition_count()
+ }
+
+ /// Build a new shared TopK dynamic filter wrapper for this `SortExec`.
fn create_filter(&self) -> Arc<RwLock<TopKDynamicFilters>> {
let children = self
.expr
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
.collect::<Vec<_>>();
- Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
- DynamicFilterPhysicalExpr::new(children, lit(true)),
- ))))
+ self.create_filter_with_expr(Arc::new(DynamicFilterPhysicalExpr::new(
+ children,
+ lit(true),
+ )))
+ }
+
+ fn create_filter_with_expr(
+ &self,
+ expr: Arc<DynamicFilterPhysicalExpr>,
+ ) -> Arc<RwLock<TopKDynamicFilters>> {
+ Arc::new(RwLock::new(
+ TopKDynamicFilters::new_with_topk_emitter_count(
+ expr,
+ self.topk_emitter_count(),
+ ),
+ ))
+ }
+
+ /// Rebuild the shared TopK filter wrapper after output partitioning
changes.
+ ///
+ /// The dynamic filter expression is preserved, but wrapper state such as
the
+ /// shared threshold and remaining emitter count is reset for the new
+ /// partitioning.
+ fn rebuild_filter_for_current_partitioning(&mut self) {
Review Comment:
Yes, exactly: this is plan-construction / plan-rewrite state, not runtime
execution state. Rebuilding the wrapper is intentional when the `SortExec`
properties change, because the wrapper owns plan-derived state such as the
shared threshold and expected emitter count. We should not carry an active
runtime threshold across that kind of rebuild.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]