ding-young commented on code in PR #16616: URL: https://github.com/apache/datafusion/pull/16616#discussion_r2246843792
########## datafusion/core/tests/execution/coop.rs: ########## @@ -250,6 +252,63 @@ async fn agg_grouped_topk_yields( query_yields(aggr, session_ctx.task_ctx()).await } +#[rstest] +#[tokio::test] +// A test that mocks the behavior of `SpillManager::read_spill_as_stream` without file access +// to verify that a cooperative stream would properly yields in a spill file read scenario +async fn spill_reader_stream_yield() -> Result<(), Box<dyn Error>> { + use arrow::compute::concat_batches; + use datafusion_physical_plan::common::spawn_buffered; + + // A mock stream that always returns `Poll::Ready(Some(...))` immediately + let always_ready = + make_lazy_exec("value", false).execute(0, SessionContext::new().task_ctx())?; + + // this function makes a consumer stream that resembles how read_stream from spill file is constructed + let stream = make_cooperative(always_ready); + + // Set large buffer so that buffer always has free space for the producer/sender + let buffer_capacity = 100_000; + let mut mock_stream = spawn_buffered(stream, buffer_capacity); + let schema = mock_stream.schema(); + + let consumer_stream = futures::stream::poll_fn(move |cx| { + let mut collected = vec![]; + // To make sure that inner stream is polled multiple times, loop until the buffer is full + // Ideally, the stream will yield before the loop ends + for _ in 0..buffer_capacity { + match mock_stream.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(batch))) => { + collected.push(batch); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(None) => { + break; + } + Poll::Pending => { + // polling inner stream may return Pending only when it reaches budget, since + // we intentionally made ProducerStream always return Ready + return Poll::Pending; + } + } + } + + // This should be unreachable since the stream is canceled + assert!(false, "This should be unreachable if cooperative stream properly yields"); Review Comment: Added ! Thanks @comphead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org