jeffreyssmith2nd opened a new issue, #14036:
URL: https://github.com/apache/datafusion/issues/14036

   ### Describe the bug
   
   **TLDR; Reading many large Parquet files can prevent a query from being 
cancelled.**
   
   
   We have a customer that is running a query similar to the following (edited 
for privacy):
   ```
   SELECT DISTINCT "A","B","C","D","E" FROM table where "time" > now() - 
INTERVAL '5 days';
   ```
   
   This produces a fairly straightforward plan 
[explain.txt](https://github.com/user-attachments/files/18323208/explain.txt).
   
   Simplified Plan:
   ```
   AggregateExec mode=FinalPartitioned
     CoalesceBatchesExec target_batch_size=8192
       RepartitionExec input_partitions=4
         AggregateExec mode=Partial
           ParquetExec file_groups={4 groups}
   ```
   
   This will read ~85 parquet files at ~100MB each. What we've seen is that 
even when a query is cancelled, the resources with that query (CPU/RAM) are 
still being utilized for almost the same amount of time as a typical execution 
of the query.
   
   ### To Reproduce
   
   _No response_
   
   ### Expected behavior
   
   Cancelling a query should (within some reasonable amount of time) truly 
cancel the query, freeing up system resources for other query executions.
   
   ### Additional context
   
   This appears to be a problem with the interaction between the 
[`GroupedHashAggregateStream`](https://github.com/apache/arrow-datafusion/blob/9a808eb7f63385e3e3b02051ed30eee91daeb613/datafusion/physical-plan/src/aggregates/row_hash.rs#L620)
 and 
[`FileStream`](https://github.com/apache/arrow-datafusion/blob/9a808eb7f63385e3e3b02051ed30eee91daeb613/datafusion/core/src/datasource/physical_plan/file_stream.rs#L246)
 approaches to yielding.
   
   The `GroupedHashAggregateStream` will infinitely loop until its child stream 
(in this case a `FileStream`) is exhausted, errors, or returns `Pending`. 
   
   The `FileStream` loops while attempting to read `RecordBatch`es from the 
underlying file, while also doing some book-keeping to stage the next `File` 
for reading. This Stream will return `Ready` when a `RecordBatch` is processed, 
or an `Error` encountered. However, it **does not** return `Pending` at any 
point. When a File is finished reading, the next File up is swapped in and 
reading continues from the new File.
   
   The combination of these two behaviours means that if there are many large 
files being read by the `FileStream`, the `GroupedHashAggregateStream` doesn't 
effectively yield back to Tokio.
   
   My [PR](https://github.com/apache/datafusion/pull/14028) to fix this is to 
have the `FileStream` return `Pending` when swapping over to a new file. This 
seems like a natural yield point, and resolves the issue with cancellation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to