alamb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2025316592


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1067,35 +1067,53 @@ impl ExecutionPlan for SortExec {
     ) -> Result<SendableRecordBatchStream> {
         trace!("Start SortExec::execute for partition {} of context session_id 
{} and task_id {:?}", partition, context.session_id(), context.task_id());
 
-        let mut input = self.input.execute(partition, Arc::clone(&context))?;
-
-        let execution_options = &context.session_config().options().execution;
-
-        trace!("End SortExec's input.execute for partition: {}", partition);
-
         let sort_satisfied = self
             .input
             .equivalence_properties()
             
.ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
 
+        let input_exec = Arc::clone(&self.input);
+
+        let execution_options = &context.session_config().options().execution;
+
+        trace!("End SortExec's input.execute for partition: {}", partition);
+
         match (sort_satisfied, self.fetch.as_ref()) {
-            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
-                input,
-                0,
-                Some(*fetch),
-                BaselineMetrics::new(&self.metrics_set, partition),
-            ))),
-            (true, None) => Ok(input),
+            (true, Some(fetch)) => {
+                let input = input_exec.execute(partition, 
Arc::clone(&context))?;
+                Ok(Box::pin(LimitStream::new(
+                    input,
+                    0,
+                    Some(*fetch),
+                    BaselineMetrics::new(&self.metrics_set, partition),
+                )))
+            }
+            (true, None) => self.input.execute(partition, 
Arc::clone(&context)),
             (false, Some(fetch)) => {
+                let schema = input_exec.schema();
                 let mut topk = TopK::try_new(
                     partition,
-                    input.schema(),
+                    schema,
                     self.expr.clone(),
                     *fetch,
                     context.session_config().batch_size(),
                     context.runtime_env(),
                     &self.metrics_set,
                 )?;
+                let input_exec = if context
+                    .session_config()
+                    .options()
+                    .optimizer
+                    .enable_dynamic_filter_pushdown
+                    && input_exec.supports_dynamic_filter_pushdown()
+                {
+                    input_exec
+                        
.push_down_dynamic_filter(topk.dynamic_filter_source())?

Review Comment:
   I think the code is now in a physical optimizer pass



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to