kosiew opened a new pull request, #20866:
URL: https://github.com/apache/datafusion/pull/20866

   
   ## Which issue does this PR close?
   
   * Part of #20788
   
   ## Rationale for this change
   
   `UnnestExec` currently expands each input `RecordBatch` into a fully 
unnested output batch before emitting any rows downstream. For high-fanout 
inputs, this can create very large intermediate batches and significantly 
increase peak memory usage, especially in query shapes like `unnest + group by` 
where downstream operators may also need to buffer data.
   
   Issue #20788 demonstrates this with large array columns, where unnesting 
multiplies row counts dramatically and causes memory usage to grow far beyond 
the size of the original input. This change improves the execution strategy by 
emitting unnested results in smaller, size-bounded chunks, which helps keep 
memory usage under control while preserving existing semantics.
   
   ## What changes are included in this PR?
   
   This PR updates `UnnestExec` to process input batches incrementally instead 
of materializing the full unnested output for the entire input batch at once.
   
   The main changes are:
   
   * Add chunked draining for input batches in `UnnestStream` via a 
`PendingBatch` state object.
   * Respect the session `batch_size` when deciding how many input rows to 
slice and unnest into the next output batch.
   * Add estimation logic (`next_input_slice_row_count`, 
`estimate_row_output_rows`, and `list_output_length`) to bound the number of 
output rows produced per emitted batch.
   * Use a conservative single-row fallback for recursive unnest (`depth > 1`) 
to avoid underestimating expansion.
   * Centralize list array dispatch in a new helper (`as_list_array_type`) and 
reuse it in both row-length estimation and existing list unnest logic.
   * Ensure empty output batches are still skipped.
   
   In addition, this PR adds focused tests covering:
   
   * high-fanout single-column unnest with output split across multiple batches,
   * multi-column unnest where per-row list lengths differ,
   * `preserve_nulls = false` behavior under batch slicing,
   * recursive unnest fallback behavior under a small batch size.
   
   ## Are these changes tested?
   
   Yes.
   
   This PR adds new tests in `datafusion/core/tests/dataframe/mod.rs` to verify 
both correctness and chunking behavior. The tests validate:
   
   * total output row counts remain correct,
   * emitted batch sizes do not exceed the configured `batch_size`,
   * output values match expected results for single-column, multi-column, 
null-dropping, and recursive unnest cases.
   
   These tests specifically exercise the new batch-slicing path introduced in 
`UnnestExec`.
   
   ## Are there any user-facing changes?
   
   There are no intended user-facing API changes.
   
   This change improves execution behavior and memory characteristics for 
queries using `unnest`, especially on high-fanout list data. Query results 
remain the same, but output may now be emitted in smaller record batches.
   
   ## LLM-generated code disclosure
   
   This PR includes LLM-generated code and comments. All LLM-generated content 
has been manually reviewed and tested.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to