alamb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2021697091


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1197,35 +1197,55 @@ impl ExecutionPlan for SortExec {
     ) -> Result<SendableRecordBatchStream> {
         trace!("Start SortExec::execute for partition {} of context session_id 
{} and task_id {:?}", partition, context.session_id(), context.task_id());
 
-        let mut input = self.input.execute(partition, Arc::clone(&context))?;
-
-        let execution_options = &context.session_config().options().execution;
-
-        trace!("End SortExec's input.execute for partition: {}", partition);
-
         let sort_satisfied = self
             .input
             .equivalence_properties()
             
.ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
 
+        let input_exec = Arc::clone(&self.input);
+
+        let execution_options = &context.session_config().options().execution;
+
+        trace!("End SortExec's input.execute for partition: {}", partition);
+
         match (sort_satisfied, self.fetch.as_ref()) {
-            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
-                input,
-                0,
-                Some(*fetch),
-                BaselineMetrics::new(&self.metrics_set, partition),
-            ))),
-            (true, None) => Ok(input),
+            (true, Some(fetch)) => {
+                let input = input_exec.execute(partition, 
Arc::clone(&context))?;
+                Ok(Box::pin(LimitStream::new(
+                    input,
+                    0,
+                    Some(*fetch),
+                    BaselineMetrics::new(&self.metrics_set, partition),
+                )))
+            }
+            (true, None) => self.input.execute(partition, 
Arc::clone(&context)),
             (false, Some(fetch)) => {
+                let schema = input_exec.schema();
                 let mut topk = TopK::try_new(
                     partition,
-                    input.schema(),
+                    schema,
                     self.expr.clone(),
                     *fetch,
                     context.session_config().batch_size(),
                     context.runtime_env(),
                     &self.metrics_set,
                 )?;
+                let input_exec = if context
+                    .session_config()
+                    .options()
+                    .optimizer
+                    .enable_dynamic_filter_pushdown
+                {
+                    // Try to push down the dynamic filter. If the execution 
plan doesn't
+                    // support it, push_down_filter will return None and we'll
+                    // keep the original input_exec.
+                    input_exec

Review Comment:
   I can try to help do this later in the week as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to