kosiew opened a new pull request, #19110:
URL: https://github.com/apache/datafusion/pull/19110
## Which issue does this PR close?
* Closes #19058.
## Rationale for this change
The `spill_pool` channel test `test_reader_catches_up_to_writer` has been
intermittently failing due to a race condition between the reader and writer
tasks. The test relies on time-based sleeps and polling to infer when the
reader has progressed, which can lead to non-deterministic ordering of events
under varying scheduler or machine load. This manifests as assertions observing
unexpected event sequences (for example, seeing `3` instead of the expected
`5`), making the test flaky and causing noise in CI.
In addition, the public `spill_pool::channel` usage example in the
documentation currently uses a spawned writer task and manual `unwrap` calls,
which both complicate the example and encourage a pattern that can reintroduce
similar races in user code. Simplifying the example and demonstrating explicit
writer finalization provides clearer guidance on correct usage of the spill
channel.
By making the test synchronization explicit and deterministic, and by
improving the example to show a straightforward, error-propagating pattern,
this PR eliminates the flakiness and makes both the tests and documentation
more robust.
## What changes are included in this PR?
* **Clarify `spill_pool::channel` usage example**
* Replace the `tokio::spawn`-based writer with a simple, synchronous loop
that pushes a fixed number of batches into the channel.
* Use `?` instead of `unwrap()` for `push_batch` to show idiomatic error
propagation in examples.
* Explicitly `drop(writer)` after the write loop to document the
requirement to finalize the spill file and signal end-of-stream to the reader.
* Simplify the reader side by removing the explicit `break` and unused
join handle, while still asserting that the expected number of batches is read.
* **Deflake `test_reader_catches_up_to_writer` in `spill_pool.rs`**
* Introduce two `tokio::sync::oneshot` channels to synchronize the reader
and writer:
* `reader_waiting_tx/rx` signals when the reader has started and is
pending on the first `next()` call.
* `first_read_done_tx/rx` signals when the reader has completed
processing the first batch.
* Replace the previous time-based delay (`tokio::time::sleep`) and polling
loop with these explicit synchronization points to ensure deterministic
ordering of events:
1. Reader starts and pends on the first `next()`.
2. Writer pushes the first batch, waking the reader.
3. Reader processes the first batch and records the event.
4. Writer is unblocked to push the second batch.
* Retain the existing `ReadWriteEvent` tracking and assertions, but make
their ordering stable across runs by removing dependence on timing and
scheduler behavior.
* Update comments around the synchronization logic to be colocated with
the primitives they describe, improving readability and maintainability.
Overall, behavior of the production `spill_pool` logic is unchanged; only
the example and tests are updated to be more robust and clearer.
## Are these changes tested?
Yes.
```
for i in {1..200}; do
echo "Run #$i started"
cargo test -p datafusion-physical-plan --profile ci --doc -q || break
echo "Run #$i completed"
done
```
* The updated `test_reader_catches_up_to_writer` has been run repeatedly to
confirm that it no longer exhibits flaky behavior with the new synchronization
scheme.
* The rest of the `spill_pool` test module continues to pass without
modification.
* The documentation example was compiled as part of the crate to ensure it
builds and type-checks with the updated API usage.
No new tests are required beyond the refactored existing test, as the
behavior under test remains the same; only the test harness and example have
been made deterministic and more idiomatic.
## Are there any user-facing changes?
* **API behavior:** No functional or behavioral changes to the public
`spill_pool` APIs.
* **Documentation/example:** The `spill_pool::channel` usage example has
been updated:
* It now uses a synchronous write loop with proper error propagation (`?`).
* It demonstrates explicitly dropping the writer to finalize the spill
file.
* It simplifies the reader code while preserving the core behavior of
consuming all batches.
These changes are additive and clarifying, and do not introduce any breaking
changes.
## LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated content
has been manually reviewed and tested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]