martin-g commented on code in PR #19002:
URL: https://github.com/apache/datafusion/pull/19002#discussion_r2576835953
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1427,9 +1430,12 @@ struct PerPartitionStream {
/// Execution metrics
baseline_metrics: BaselineMetrics,
+
+ batch_coalescer: Option<LimitedBatchCoalescer>,
}
impl PerPartitionStream {
+ #[allow(clippy::too_many_arguments)]
Review Comment:
```suggestion
#[expect(clippy::too_many_arguments)]
```
By using `expect` instead of `allow` the Clippy rule will fail too once it
is no more needed and the developer will have to remove it. Otherwise it may
become obsolete.
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1540,7 +1550,46 @@ impl Stream for PerPartitionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- let poll = self.poll_next_inner(cx);
+ let poll = match self.batch_coalescer.take() {
+ Some(mut coalescer) => {
+ let cloned_time =
self.baseline_metrics.elapsed_compute().clone();
+ let mut completed = false;
+ let poll;
+ loop {
+ if let Some(batch) = coalescer.next_completed_batch() {
+ poll = Poll::Ready(Some(Ok(batch)));
+ break;
+ }
+ if completed {
+ poll = Poll::Ready(None);
+ break;
+ }
+ let inner_poll = self.poll_next_inner(cx);
+ let _timer = cloned_time.timer();
Review Comment:
Shouldn't this be before the `poll_next_inner()` call ?
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1540,7 +1550,46 @@ impl Stream for PerPartitionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- let poll = self.poll_next_inner(cx);
+ let poll = match self.batch_coalescer.take() {
+ Some(mut coalescer) => {
+ let cloned_time =
self.baseline_metrics.elapsed_compute().clone();
+ let mut completed = false;
+ let poll;
+ loop {
+ if let Some(batch) = coalescer.next_completed_batch() {
+ poll = Poll::Ready(Some(Ok(batch)));
+ break;
+ }
+ if completed {
+ poll = Poll::Ready(None);
+ break;
+ }
+ let inner_poll = self.poll_next_inner(cx);
+ let _timer = cloned_time.timer();
+
+ match inner_poll {
+ Poll::Pending => {
+ poll = Poll::Pending;
+ break;
+ }
+ Poll::Ready(None) => {
+ completed = true;
+ coalescer.finish()?;
+ }
+ Poll::Ready(Some(Ok(batch))) => {
+ coalescer.push_batch(batch)?;
Review Comment:
```suggestion
coalescer.?;
if let Err(e) = coalescer.push_batch(batch) {
self.batch_coalescer = Some(coalescer);
return
self.baseline_metrics.record_poll(Poll::Ready(Some(Err(e))));
}
```
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1540,7 +1550,46 @@ impl Stream for PerPartitionStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- let poll = self.poll_next_inner(cx);
+ let poll = match self.batch_coalescer.take() {
+ Some(mut coalescer) => {
+ let cloned_time =
self.baseline_metrics.elapsed_compute().clone();
+ let mut completed = false;
+ let poll;
+ loop {
+ if let Some(batch) = coalescer.next_completed_batch() {
+ poll = Poll::Ready(Some(Ok(batch)));
+ break;
+ }
+ if completed {
+ poll = Poll::Ready(None);
+ break;
+ }
+ let inner_poll = self.poll_next_inner(cx);
+ let _timer = cloned_time.timer();
+
+ match inner_poll {
+ Poll::Pending => {
+ poll = Poll::Pending;
+ break;
+ }
+ Poll::Ready(None) => {
+ completed = true;
+ coalescer.finish()?;
Review Comment:
```suggestion
if let Err(e) = coalescer.finish() {
self.batch_coalescer = Some(coalescer);
return
self.baseline_metrics.record_poll(Poll::Ready(Some(Err(e))));
}
```
Otherwise in case of an error the `self.batch_coalescer` won't be restored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]