lilic opened a new issue, #20448:
URL: https://github.com/apache/datafusion/issues/20448

   ### Describe the bug
   
   👋 I ran into this while testing some things and noticed that the TopK 
operator (`ORDER BY ... LIMIT k`) panics with `"overflow"` when the combined 
string data of the selected rows in a `Utf8` (i32-offset) column exceeds 
`i32::MAX` (~2GB).
   
   `TopKHeap::emit_with_state()` builds a single `RecordBatch` from all `k` 
rows via one [`interleave_record_batch()` 
call](https://github.com/apache/datafusion/blob/ace9cd44b7356d60e6d69d0b98ac3f5606d55507/datafusion/physical-plan/src/topk/mod.rs#L785).
 When the total byte length of the interleaved strings overflows `i32`, 
`arrow-select`'s `interleave_bytes()` panics at `interleave.rs` via 
`.expect("overflow")` instead of returning an error.
   
   ### To Reproduce
   
   Minimal reproducer:
   ```rust
   use datafusion::arrow::array::{Int32Array, StringArray};
   use datafusion::arrow::datatypes::{DataType, Field, Schema};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::datasource::MemTable;
   use datafusion::prelude::*;
   use std::sync::Arc;
   
   #[tokio::main]
   async fn main() {
       println!("Creating batches with ~800MB strings each...");
   
       let schema = Arc::new(Schema::new(vec![
           Field::new("id", DataType::Int32, false),
           Field::new("big_str", DataType::Utf8, false),
       ]));
   
       let big = "x".repeat(800_000_000); // ~800MB per string
   
       // 1 partition with 3 separate batches (not 3 partitions!).
       // With a single partition, SortExec uses TopK internally for ORDER BY + 
LIMIT.
       // TopK receives each batch from the stream, maintains a heap, and on 
emit
       // calls interleave_record_batch across stored batches — triggering the 
overflow.
       let batches_in_partition: Vec<RecordBatch> = (0..3)
           .map(|i| {
               let mut ids = vec![i]; // big-string row gets low id
               let mut strs: Vec<&str> = vec![big.as_str()];
               for j in 0..9 {
                   ids.push(100 + i * 10 + j); // high ids — won't be in top 3
                   strs.push("small");
               }
               RecordBatch::try_new(
                   schema.clone(),
                   vec![
                       Arc::new(Int32Array::from(ids)),
                       Arc::new(StringArray::from(strs)),
                   ],
               )
               .unwrap()
           })
           .collect();
   
       // Single partition containing all 3 batches
       let table = MemTable::try_new(schema, 
vec![batches_in_partition]).unwrap();
   
       let ctx = SessionContext::new();
       ctx.register_table("t", Arc::new(table)).unwrap();
   
       println!("Running: SELECT * FROM t ORDER BY id LIMIT 3");
       println!("This triggers TopK which calls interleave_record_batch on the 
3 big rows...");
   
       // ORDER BY + LIMIT triggers TopK (via SortExec with fetch).
       let df = ctx.sql("SELECT * FROM t ORDER BY id LIMIT 3").await.unwrap();
       match df.collect().await {
           Ok(_) => println!("Unexpectedly succeeded"),
           Err(e) => println!("Got error: {e}"),
       }
   }
   
   ```
   
   **`Cargo.toml`:**
   ```toml
   ...
   [dependencies]
   datafusion = "52.1"
   tokio = { version = "1", features = ["full"] }
   ```
   
   Panic output (partial, see file attached for full trace panic):
   ```
   thread 'main' (75754020) panicked at 
/Users/lili/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/arrow-select-57.3.0/src/interleave.rs:180:41:
   overflow
   ```
   
   [panic.log](https://github.com/user-attachments/files/25441905/panic.log)
   
   Note on SLT reproducer: I attempted to write a sqllogictest for this but 
couldn't get it to trigger the panic via pure SQL easily.
   
   ### Expected behavior
   
   The query should either succeed or return a proper error, but probably 
should not result in a panic.
   
   ### Additional context
   
   The immediate panic site is in `arrow-select`'s `interleave_bytes()` which 
uses `.expect("overflow")`. But the structural issue is that 
`TopKHeap::emit_with_state()` tries to build one unbounded `RecordBatch` via a 
single `interleave_record_batch()` call without accounting for offset limits.
   
   This panic occurs even on latest DF 52.1.0 + arrow 57.3.0, I also tried on 
main and it also panics there. Thanks for reading!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to