pepijnve commented on issue #16482: URL: https://github.com/apache/datafusion/issues/16482#issuecomment-3005212491
@ding-young I think the gist of it is correct indeed. I tinkered a bit locally with `spill_reader_yield` and condensed it down to this which is doing the same thing but is a bit easier to grasp since it fits on one page. ``` async fn spill_reader_yield( ) -> Result<(), Box<dyn Error>> { use datafusion_physical_plan::common::spawn_buffered; use datafusion_execution::{RecordBatchStream}; use futures::{Stream}; /// A mock stream that always returns `Poll::Ready(Some(...))` immediately let always_ready = make_lazy_exec("value", false).execute(0, SessionContext::new().task_ctx())?; // this function makes a consumer stream that resembles how read_stream from spill file is constructed let stream = make_cooperative(always_ready); // Set large buffer so that buffer always has free space for the producer/sender let buffer_capacity = 100_000; let mut mock_stream = spawn_buffered(stream, buffer_capacity); let schema = mock_stream.schema(); let consumer_stream = futures::stream::poll_fn(move |cx| { use arrow::compute::concat_batches; let mut collected = vec![]; // To make sure that inner stream is polled multiple times, loop forever if inner (producer) stream returns Ready loop { match mock_stream.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(batch))) => { println!("received batch from inner"); collected.push(batch); } Poll::Ready(Some(Err(e))) => { println!("error from inner"); return Poll::Ready(Some(Err(e))); } Poll::Ready(None) => { println!("inner stream ended"); break; } Poll::Pending => { // polling inner stream may return Pending only when it reaches budget, since // we intentionally made ProducerStream always return Ready return Poll::Pending; } } } // This should be unreachable since the stream is canceled let combined = concat_batches(&mock_stream.schema(), &collected) .expect("Failed to concat batches"); Poll::Ready(Some(Ok(combined))) }); let consumer_record_batch_stream = Box::pin(RecordBatchStreamAdapter::new( schema, consumer_stream )); stream_yields(consumer_record_batch_stream).await } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org