alamb commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2114732484
##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -77,6 +77,9 @@ impl AggregateStream {
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
let input = agg.input.execute(partition, Arc::clone(&context))?;
+ // Wrap no‐grouping aggregates in our YieldStream
Review Comment:
I think we should add the rationale for doing this here, otherwise it is not
clear
For example, something like
```suggestion
// Yield control back to tokio after a certain number of batches so
it can check for cancellation.
// See https://github.com/apache/datafusion/issues/16193
```
##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -170,6 +173,65 @@ impl AggregateStream {
}
}
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+ inner: SendableRecordBatchStream,
+ batches_processed: usize,
+ buffer: Option<Result<RecordBatch>>,
+}
+
+impl YieldStream {
Review Comment:
This is a neat structure -- think it is more generally useful. Perhaps we
could put it into somewhere that it is more likely discoverable, such as
https://github.com/apache/datafusion/tree/main/datafusion/common-runtime/src
##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -170,6 +173,65 @@ impl AggregateStream {
}
}
+/// A stream that yields batches of data, yielding control back to the executor
Review Comment:
```suggestion
/// A stream that yields batches of data, yielding control back to the
executor every `YIELD_BATCHES` batches
///
/// This can be useful for to allow operators that might not yield to check
for cancellation
```
##########
datafusion/physical-plan/src/aggregates/no_grouping.rs:
##########
@@ -170,6 +173,65 @@ impl AggregateStream {
}
}
+/// A stream that yields batches of data, yielding control back to the executor
+pub struct YieldStream {
+ inner: SendableRecordBatchStream,
+ batches_processed: usize,
+ buffer: Option<Result<RecordBatch>>,
+}
+
+impl YieldStream {
+ pub fn new(inner: SendableRecordBatchStream) -> Self {
+ Self {
+ inner,
+ batches_processed: 0,
+ buffer: None,
+ }
+ }
+}
+
+// Stream<Item = Result<RecordBatch>> to poll_next_unpin
+impl Stream for YieldStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ const YIELD_BATCHES: usize = 64;
Review Comment:
I think this would be easier to find and make the code easer to undertand of
we moved it to a separate `const` in the module and add comments that describe
it (aka the number of batches returned prior to checking for drop)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]