ding-young commented on code in PR #16616:
URL: https://github.com/apache/datafusion/pull/16616#discussion_r2247700676


##########
datafusion/core/tests/execution/coop.rs:
##########
@@ -250,6 +252,58 @@ async fn agg_grouped_topk_yields(
     query_yields(aggr, session_ctx.task_ctx()).await
 }
 
+#[rstest]
+#[tokio::test]
+// A test that mocks the behavior of `SpillManager::read_spill_as_stream` 
without file access
+// to verify that a cooperative stream would properly yields in a spill file 
read scenario
+async fn spill_reader_stream_yield() -> Result<(), Box<dyn Error>> {
+    use datafusion_physical_plan::common::spawn_buffered;
+
+    // A mock stream that always returns `Poll::Ready(Some(...))` immediately
+    let always_ready =
+        make_lazy_exec("value", false).execute(0, 
SessionContext::new().task_ctx())?;
+
+    // this function makes a consumer stream that resembles how read_stream 
from spill file is constructed
+    let stream = make_cooperative(always_ready);
+
+    // Set large buffer so that buffer always has free space for the 
producer/sender
+    let buffer_capacity = 100_000;
+    let mut mock_stream = spawn_buffered(stream, buffer_capacity);
+    let schema = mock_stream.schema();
+
+    let consumer_stream = futures::stream::poll_fn(move |cx| {
+        let mut collected = vec![];
+        // To make sure that inner stream is polled multiple times, loop until 
the buffer is full
+        // Ideally, the stream will yield before the loop ends
+        for _ in 0..buffer_capacity {
+            match mock_stream.as_mut().poll_next(cx) {
+                Poll::Ready(Some(Ok(batch))) => {
+                    collected.push(batch);
+                }
+                Poll::Ready(Some(Err(e))) => {
+                    return Poll::Ready(Some(Err(e)));
+                }
+                Poll::Ready(None) => {
+                    break;
+                }
+                Poll::Pending => {
+                    // polling inner stream may return Pending only when it 
reaches budget, since
+                    // we intentionally made ProducerStream always return Ready
+                    return Poll::Pending;
+                }
+            }
+        }
+
+        // This should be unreachable since the stream is canceled
+        unreachable!("Expected the stream to be canceled, but it continued 
polling");
+    });

Review Comment:
   Updated (Clippy told me to use `unreachable!`), thanks @comphead   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to