alamb commented on code in PR #16196: URL: https://github.com/apache/datafusion/pull/16196#discussion_r2114732484
########## datafusion/physical-plan/src/aggregates/no_grouping.rs: ########## @@ -77,6 +77,9 @@ impl AggregateStream { let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(&context))?; + // Wrap no‐grouping aggregates in our YieldStream Review Comment: I think we should add the rationale for doing this here, otherwise it is not clear For example, something like ```suggestion // Yield control back to tokio after a certain number of batches so it can check for cancellation. // See https://github.com/apache/datafusion/issues/16193 ``` ########## datafusion/physical-plan/src/aggregates/no_grouping.rs: ########## @@ -170,6 +173,65 @@ impl AggregateStream { } } +/// A stream that yields batches of data, yielding control back to the executor +pub struct YieldStream { + inner: SendableRecordBatchStream, + batches_processed: usize, + buffer: Option<Result<RecordBatch>>, +} + +impl YieldStream { Review Comment: This is a neat structure -- think it is more generally useful. Perhaps we could put it into somewhere that it is more likely discoverable, such as https://github.com/apache/datafusion/tree/main/datafusion/common-runtime/src ########## datafusion/physical-plan/src/aggregates/no_grouping.rs: ########## @@ -170,6 +173,65 @@ impl AggregateStream { } } +/// A stream that yields batches of data, yielding control back to the executor Review Comment: ```suggestion /// A stream that yields batches of data, yielding control back to the executor every `YIELD_BATCHES` batches /// /// This can be useful for to allow operators that might not yield to check for cancellation ``` ########## datafusion/physical-plan/src/aggregates/no_grouping.rs: ########## @@ -170,6 +173,65 @@ impl AggregateStream { } } +/// A stream that yields batches of data, yielding control back to the executor +pub struct YieldStream { + inner: SendableRecordBatchStream, + batches_processed: usize, + buffer: Option<Result<RecordBatch>>, +} + +impl YieldStream { + pub fn new(inner: SendableRecordBatchStream) -> Self { + Self { + inner, + batches_processed: 0, + buffer: None, + } + } +} + +// Stream<Item = Result<RecordBatch>> to poll_next_unpin +impl Stream for YieldStream { + type Item = Result<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + const YIELD_BATCHES: usize = 64; Review Comment: I think this would be easier to find and make the code easer to undertand of we moved it to a separate `const` in the module and add comments that describe it (aka the number of batches returned prior to checking for drop) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org