adriangb commented on code in PR #15301: URL: https://github.com/apache/datafusion/pull/15301#discussion_r2022020818
########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -1197,35 +1197,55 @@ impl ExecutionPlan for SortExec { ) -> Result<SendableRecordBatchStream> { trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - let mut input = self.input.execute(partition, Arc::clone(&context))?; - - let execution_options = &context.session_config().options().execution; - - trace!("End SortExec's input.execute for partition: {}", partition); - let sort_satisfied = self .input .equivalence_properties() .ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone())); + let input_exec = Arc::clone(&self.input); + + let execution_options = &context.session_config().options().execution; + + trace!("End SortExec's input.execute for partition: {}", partition); + match (sort_satisfied, self.fetch.as_ref()) { - (true, Some(fetch)) => Ok(Box::pin(LimitStream::new( - input, - 0, - Some(*fetch), - BaselineMetrics::new(&self.metrics_set, partition), - ))), - (true, None) => Ok(input), + (true, Some(fetch)) => { + let input = input_exec.execute(partition, Arc::clone(&context))?; + Ok(Box::pin(LimitStream::new( + input, + 0, + Some(*fetch), + BaselineMetrics::new(&self.metrics_set, partition), + ))) + } + (true, None) => self.input.execute(partition, Arc::clone(&context)), (false, Some(fetch)) => { + let schema = input_exec.schema(); let mut topk = TopK::try_new( partition, - input.schema(), + schema, self.expr.clone(), *fetch, context.session_config().batch_size(), context.runtime_env(), &self.metrics_set, )?; + let input_exec = if context + .session_config() + .options() + .optimizer + .enable_dynamic_filter_pushdown + { + // Try to push down the dynamic filter. If the execution plan doesn't + // support it, push_down_filter will return None and we'll + // keep the original input_exec. + input_exec Review Comment: So I gave it a shot. It was a FIGHT and unfortunately I think I found the deal breaker way too late: `TopK` itself is only ever created in `SortExec::execute` and depends on the `partition` parameter to `execute`. So I don't see how we can get a reference to it's state during an optimizer pass. Even if we address this case I'm afraid that there will be other optimizations (joins?) that we then can't complete because we boxed ourselves into doing it as an optimizer pass instead of at runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org