crepererum commented on code in PR #16093:
URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107279416


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec {
         // Get existing ordering to use for merging
         let sort_exprs = self.sort_exprs().cloned().unwrap_or_default();
 
+        let state = Arc::clone(&self.state);
+        if !self.state_initialized.swap(true, Ordering::Relaxed) {
+            state.lock().ensure_input_streams_initialized(

Review Comment:
   > Seems highly unlikely (we've run this under high load for a while and it 
was never happened)
   
   I don't fancy to hunt Heisenbugs in production. Esp. some cloud deployments 
w/ low/partial CPU allocations might be subject to this issue. You can easily 
stall a thread of a few milliseconds.
   
   > I'm not sure if we can replicate this reliably in a unit test
   
   Fair, but the fact that the code is strongly typed and allows that to happen 
is reason to change it. I'm OK w/o a test then. The expected behavior would 
then be that you might need to block during stream polling (because thread 1 
may still have the lock and runs the preparation step or the said poller needs 
to do that).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to