crepererum commented on code in PR #16093:
URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107279416
##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec {
// Get existing ordering to use for merging
let sort_exprs = self.sort_exprs().cloned().unwrap_or_default();
+ let state = Arc::clone(&self.state);
+ if !self.state_initialized.swap(true, Ordering::Relaxed) {
+ state.lock().ensure_input_streams_initialized(
Review Comment:
> Seems highly unlikely (we've run this under high load for a while and it
was never happened)
I don't fancy to hunt Heisenbugs in production. Esp. some cloud deployments
w/ low/partial CPU allocations might be subject to this issue. You can easily
stall a thread of a few milliseconds.
> I'm not sure if we can replicate this reliably in a unit test
Fair, but the fact that the code is strongly typed and allows that to happen
is reason to change it. I'm OK w/o a test then. The expected behavior would
then be that you might need to block during stream polling (because thread 1
may still have the lock and runs the preparation step or the said poller needs
to do that).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]