pepijnve commented on code in PR #16319: URL: https://github.com/apache/datafusion/pull/16319#discussion_r2136240231
########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec { Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { - while let Some(batch) = input.next().await { - let batch = batch?; - topk.insert_batch(batch)?; - if topk.finished { - break; + // Spawn a task the first time the stream is polled for the sort phase. + // This ensures the consumer of the sort does not poll unnecessarily + // while the sort is ongoing Review Comment: I've added a test case that attempts to demonstrate that processing is deferred. If this looks ok to you I can add the same thing for the other touched code as well. I'm not sure how I can demonstrate the absence of multi-threading in a test case. Wrt comprehensibility, I have to admit I still very much in the learning-as-I-go phase of using the futures crate. There might be a more elegant or straightforward way to express this construct. ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -1126,14 +1127,20 @@ impl ExecutionPlan for SortExec { Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { - while let Some(batch) = input.next().await { - let batch = batch?; - topk.insert_batch(batch)?; - if topk.finished { - break; + // Spawn a task the first time the stream is polled for the sort phase. + // This ensures the consumer of the sort does not poll unnecessarily + // while the sort is ongoing Review Comment: I've added a test case that attempts to demonstrate that processing is deferred. If this looks ok to you I can add the same thing for the other touched code as well. I'm not sure how I can demonstrate the absence of multi-threading in a test case. Wrt comprehensibility, I have to admit I am still very much in the learning-as-I-go phase of using the futures crate. There might be a more elegant or straightforward way to express this construct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org