djanderson commented on code in PR #75:
URL: https://github.com/apache/datafusion-site/pull/75#discussion_r2152791907


##########
content/blog/2025-06-15-cancellation.md:
##########
@@ -0,0 +1,328 @@
+# Query Cancellation
+
+## The Challenge of Cancelling Long-Running Queries
+
+Have you ever tried to cancel a query that just wouldn't stop?
+In this post, we'll take a look at why that can happen in DataFusion and what 
the community did to resolve the problem in depth.
+
+### Understanding Rust's Async Model
+
+To really understand the cancellation problem you need to be somewhat familiar 
with Rust's asynchronous programming model.
+This is a bit different than what you might be used to from other ecosystems.
+Let's go over the basics again as a refresher.
+If you're familiar with the ins and outs of `Future` and `async` you can skip 
this section.
+
+#### Futures Are Inert
+
+Rust's asynchronous programming model is built around the `Future<T>` trait.
+In contrast to, for instance, Javascript's `Promise` or Java's `Future` a Rust 
`Future` does not necessarily represent an actively running asynchronous job.
+Instead, a `Future<T>` represents a lazy calculation that only makes progress 
when explicitly polled.
+If nothing tells a `Future` to try and make progress explicitly, it is an 
inert object.
+
+You ask a `Future<T>`to advance its calculation as much as possible by calling 
the `poll` method.
+The future responds with either:
+- `Poll::Pending` if it needs to wait for something (like I/O) before it can 
continue
+- `Poll::Ready<T>` when it has completed and produced a value
+
+When a future returns `Pending`, it saves its internal state so it can pick up 
where it left off the next time you poll it.
+This state management is what makes Rust's futures memory-efficient and 
composable.
+It also needs to set up the necessary signaling so that the caller gets 
notified when it should try to call `poll` again.
+This avoids having to call `poll` in a busy-waiting loop.
+
+Rust's `async` keyword provides syntactic sugar over this model.
+When you write an `async` function or block, the compiler transforms it into 
an implementation of the `Future` trait for you.
+Since all the state management is compiler generated and hidden from sight, 
async code tends to be more readable while maintaining the same underlying 
mechanics.
+
+The `await` keyword complements this by letting you pause execution until a 
future completes.
+When you write `.await` after a future, you're essentially telling the 
compiler to poll that future until it's ready before program execution 
continues with the statement after the await.
+
+#### From Futures to Streams
+
+The `futures` crate extends the `Future` model with the `Stream` trait.
+Streams represent a sequence of values produced asynchronously rather than 
just a single value.
+The `Stream` trait has one method named `poll_next` that returns:
+- `Poll::Pending` when the next value isn't ready yet, just like a `Future` 
would
+- `Poll::Ready(Some(value))` when a new value is available
+- `Poll::Ready(None)` when the stream is exhausted
+
+### How DataFusion Executes Queries
+
+In DataFusion, queries are executed as follows:
+
+1. First the query is compiled into a tree of `ExecutionPlan` nodes
+2. `ExecutionPlan::execute` is called on the root of the tree. This method 
returns a `SendableRecordBatchStream` (a pinned `Box<dyn Stream<RecordBatch>>`)
+3. `Stream::poll_next` is called in a loop to get the results
+
+The stream we get in step 2 is actually the root of a tree of streams that 
mostly mirrors the execution plan tree.
+Each stream tree node processes the record batches it gets from its children.
+The leaves of the tree produce record batches themselves.
+
+Query execution progresses each time you call `poll_next` on the root stream.
+This call typically cascades down the tree, with each node calling `poll_next` 
on its children to get the data it needs to process.
+
+Here's where the first signs of problems start to show up: some operations 
(like aggregations, sorts, or certain join phases) need to process a lot of 
data before producing any output.
+When `poll_next` encounters one of these operations, it might need to perform 
a substantial amount of work before it can return a record batch.
+
+### Tokio and Cooperative Scheduling
+
+We need to make a small detour now via Tokio's scheduler before we can get to 
the query cancellation problem.
+DataFusion makes use of the Tokio asynchronous runtime, which uses a 
cooperative scheduling model.
+This is fundamentally different from preemptive scheduling that you might be 
used to:
+
+- In preemptive scheduling, the system can interrupt a task at any time to run 
something else
+- In cooperative scheduling, tasks must voluntarily yield control back to the 
scheduler
+
+This distinction is crucial for understanding our cancellation problem.
+When a Tokio task is running, it can't be forcibly interrupted - it must 
cooperate by periodically yielding control.
+
+Similarly, when you try to abort a task by calling `JoinHandle::abort()`, the 
Tokio runtime can't immediately force it to stop.
+You're just telling Tokio: "When this task next yields control, don't resume 
it."
+If the task never yields, it can't be aborted.
+
+### The Cancellation Problem
+
+With all the necessary background in place, now let's look at how the 
DataFusion CLI tries to run and cancel a query.
+The code below paraphrases what the CLI actually does:
+
+```rust
+fn exec_query() {
+    let runtime: tokio::runtime::Runtime = ...;
+    let stream: SendableRecordBatchStream = ...;
+
+    runtime.block_on(async {
+        tokio::select! {
+            next_batch = stream.next() => ...
+            _ = signal::ctrl_c() => ...,
+        }
+    })
+}
+```
+
+First the CLI sets up a Tokio runtime instance.
+It then reads the query you want to execute from standard input or file and 
turns it into a stream.
+Then if calls `next` on stream which is an `async` wrapper for `poll_next`.
+It passes this to the `select!` macro along with a ctrl-C handler.
+
+The `select!` macro is supposed to race these two futures and complete when 
either one finishes.
+When you press Ctrl+C, the `signal::ctrl_c()` future should complete, allowing 
the query to be cancelled.
+
+But there's a catch: `select!` still follows cooperative scheduling rules.
+It polls each future in sequence, and if the first one (our query) gets stuck 
in a long computation, it never gets around to polling the cancellation signal.
+
+Imagine a query that needs to calculate something intensive, like sorting 
billions of rows.
+The `poll_next` call might run for hours without returning.
+During this time, Tokio can't check if you've pressed Ctrl+C, and the query 
continues running despite your cancellation request.
+
+## A Closer Look at Blocking Operators
+
+Let's peel back a layer of the onion and look at what's happening in a 
blocking `poll_next` implementation.
+Here's a drastically simplified version of a `COUNT(*)` aggregation - 
something you might use in a query like `SELECT COUNT(*) FROM table`:
+
+```rust
+struct BlockingStream {
+    stream: SendableRecordBatchStream,
+    count: usize,
+    finished: bool,
+}
+
+impl Stream for BlockingStream {
+    type Item = Result<RecordBatch>;
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        if self.finished {
+            return Poll::Ready(None);
+        }
+
+        loop {
+            match ready!(self.stream.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => self.count += batch.num_rows(),
+                None => {
+                    self.finished = true;
+                    return 
Poll::Ready(Some(Ok(create_record_batch(self.count))));
+                }
+                Some(Err(e)) => return Poll::Ready(Some(Err(e))),
+            }
+        }
+    }
+}
+```
+
+How does this code work? Let's break it down step by step:
+
+1. **Initial check**: We first check if we've already finished processing. If 
so, we return `Ready(None)` to signal the end of our stream:
+   ```rust
+   if self.finished {
+       return Poll::Ready(None);
+   }
+   ```
+
+2. **Processing loop**: If we're not done yet, we enter a loop to process 
incoming batches from our input stream:
+   ```rust
+   loop {
+       match ready!(self.stream.poll_next_unpin(cx)) {
+           // Handle different cases...
+       }
+   }
+   ```
+   The `ready!` macro checks if the input stream returned `Pending` and if so, 
immediately returns `Pending` from our function as well.
+
+3. **Processing data**: For each batch we receive, we simply add its row count 
to our running total:
+   ```rust
+   Some(Ok(batch)) => self.count += batch.num_rows(),
+   ```
+
+4. **End of input**: When the child stream is exhausted (returns `None`), we 
calculate our final result and convert it into a record batch (omitted for 
brevity):
+   ```rust
+   None => {
+       self.finished = true;
+       return Poll::Ready(Some(Ok(create_record_batch(self.count))));
+   }
+   ```
+
+5. **Error handling**: If we encounter an error, we pass it along immediately:
+   ```rust
+   Some(Err(e)) => return Poll::Ready(Some(Err(e))),
+   ```
+
+This code looks perfectly reasonable at first glance.
+But there's a subtle issue lurking here: what happens if the input stream 
consistently returns `Ready` and never returns `Pending`?
+
+In that case, the processing loop will keep running without ever yielding 
control back to Tokio's scheduler.
+This means we could be stuck in a single `poll_next` call for minutes or even 
hours - exactly the scenario that prevents query cancellation from working!
+
+## Unblocking Operators
+
+Now let's look at how we can ensure we return `Pending` every now and then.
+
+### Independent Cooperative Operators
+
+One simple way to achieve this is using a loop counter.
+We do the exact same thing as before, but on each loop iteration we decrement 
our counter.
+If the counter hits zero we return `Pending`.
+This ensures we iterate at most 128 times before yielding.
+
+```rust
+struct CountingSourceStream {
+   counter: usize
+}
+
+impl Stream for CountingSourceStream {
+    type Item = Result<RecordBatch>;
+   
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        if self.counter >= self.128 {
+            self.counter = 0;
+            cx.waker().wake_by_ref();
+            return Poll::Pending;
+        }
+
+        self.counter += 1;
+        let batch = ...;
+        Ready(Some(Ok(batch)))
+    }
+}
+```
+
+This would be a simple solution for this source operator.
+If our simple aggregation operator consumes this stream it will receive a 
`Pending` periodically causing it to yield.
+Could it be that simple?
+
+Unfortunately, no.
+Let's look at what happens when we start combining operators in more complex 
configurations?
+Suppose we create a plan like this.
+
+```
+          Merge       
+      ┌─────┴─────┐   
+    Filter  CountingSource
+      │               
+CountingSource            
+```
+
+Each source is producing a pending every 128 batches.
+The filter is the standard DataFusion filter, with a filter expression that 
drops a batch every 50 record batches.
+Merge is a simple combining operator the uses `futures::stream::select` to 
combine two stream.
+
+When we set this in motion, the merge operator will poll the left and right 
branch in a round-robin fashion.
+The sources will each emit `Pending` every 128 batches, but the filter 
dropping batches makes it so that these arrive out-of-phase at the merge 
operator.
+As a consequence the merge operator will always have the opportunity of 
polling the other stream when one of the two returns `Pending`.
+The output stream of merge is again an always ready stream.
+If we pass this to our aggregating operator we're right back where we started.
+
+### Coordinated Cooperation
+
+Wouldn't it be great if we could get all the operators to coordinate amongst 
each other?
+When one of them determines that it's time to yield, all the other operators 
agree and start returning `Pending` as well.
+That way our task would be coaxed towards yielding even if it tried to poll 
many different operators.
+
+Luckily the [developers of Tokio ran into the exact same 
problem](https://tokio.rs/blog/2020-04-preemption) described above when network 
servers were under heavy load and come up with a solution.
+Back in 2020 already, Tokio 0.2.14 introduced a per-task operation budget.
+Rather than having individual counter littered throughout the code, the 
runtime manages a per task counter which is decremented by Tokio resources.
+When the counter hits zero, all resources start returning `Pending`.
+The task will then yield, after which the Tokio runtime resets the counter.
+
+As it turns out DataFusion was already using this mechanism implicitly.
+Every exchange-like operator internally makes use of a Tokio multiple 
producer, single consumer `Channel`.
+When calling `Receiver::recv` for one of these channels, a unit of Tokio task 
budget is consumed.
+As a consequence, query plans that made use of exchange-like operators were 
already mostly cancelable.
+
+### Depleting The Budget

Review Comment:
   Thanks for the context!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to