gabotechs commented on code in PR #16093:
URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107588557


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec {
         // Get existing ordering to use for merging
         let sort_exprs = self.sort_exprs().cloned().unwrap_or_default();
 
+        let state = Arc::clone(&self.state);
+        if !self.state_initialized.swap(true, Ordering::Relaxed) {
+            state.lock().ensure_input_streams_initialized(

Review Comment:
   Pushed a small change in 
https://github.com/apache/datafusion/pull/16093/commits/1b276cc15683d5b199ead4f5970c4e9c9cb0542b.
 Now:
   
   1. Thread 1 and 2 call `.execute()` at the same time
   2. Thread 1 wins the atomic bool, but still has not acquired the lock (it's 
about to, it's just on the next line)
   3. while Thread 1 is between lines 655 and 656 Thread 2:
       - skips the if statement
       - builds the stream
       - polls the stream
       - acquires the lock
   4. Thread 1 finally moves from line 655 to 656 and tries to acquire the 
lock, but it's locked by Thread 2, so it waits until it gets released
   5. Thread 2 does all the initialization work inside the stream polling, 
releasing the lock
   6. Thread 1 acquires the lock, but there's nothing to do, as all the 
initialization already happened, so it moves on



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to