alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2114732484


##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -77,6 +77,9 @@ impl AggregateStream {
         let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
         let input = agg.input.execute(partition, Arc::clone(&context))?;
 
+        // Wrap no‐grouping aggregates in our YieldStream

Review Comment:
   I think we should add the rationale for doing this here, otherwise it is not 
clear
   
   For example, something like
   ```suggestion
           // Yield control back to tokio after a certain number of batches so 
it can check for cancellation. 
           // See https://github.com/apache/datafusion/issues/16193
   ```



##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -170,6 +173,65 @@ impl AggregateStream {
     }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+    inner: SendableRecordBatchStream,
+    batches_processed: usize,
+    buffer: Option<Result<RecordBatch>>,
+}
+
+impl YieldStream {

Review Comment:
   This is a neat structure --  think it is more generally useful. Perhaps we 
could put it into somewhere that it is more likely discoverable, such as 
https://github.com/apache/datafusion/tree/main/datafusion/common-runtime/src



##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -170,6 +173,65 @@ impl AggregateStream {
     }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor

Review Comment:
   ```suggestion
   /// A stream that yields batches of data, yielding control back to the 
executor every `YIELD_BATCHES` batches
   ///
   /// This can be useful for to allow operators that might not yield to check 
for cancellation
   ```



##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -170,6 +173,65 @@ impl AggregateStream {
     }
 }
 
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+    inner: SendableRecordBatchStream,
+    batches_processed: usize,
+    buffer: Option<Result<RecordBatch>>,
+}
+
+impl YieldStream {
+    pub fn new(inner: SendableRecordBatchStream) -> Self {
+        Self {
+            inner,
+            batches_processed: 0,
+            buffer: None,
+        }
+    }
+}
+
+// Stream<Item = Result<RecordBatch>> to poll_next_unpin
+impl Stream for YieldStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        const YIELD_BATCHES: usize = 64;

Review Comment:
   I think this would be easier to find and make the code easer to undertand of 
 we moved it to a separate `const` in the module and add comments that describe 
it (aka the number of batches returned prior to checking for drop)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to