gabotechs commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107588557
########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); + let state = Arc::clone(&self.state); + if !self.state_initialized.swap(true, Ordering::Relaxed) { + state.lock().ensure_input_streams_initialized( Review Comment: Pushed a small change in https://github.com/apache/datafusion/pull/16093/commits/1b276cc15683d5b199ead4f5970c4e9c9cb0542b. Now: 1. Thread 1 and 2 call `.execute()` at the same time 2. Thread 1 wins the atomic bool, but still has not acquired the lock (it's about to, it's just on the next line) 3. while Thread 1 is between lines 655 and 656 Thread 2: - skips the if statement - builds the stream - polls the stream - acquires the lock 4. Thread 1 finally moves from line 655 to 656 and tries to acquire the lock, but it's locked by Thread 2, so it waits until it gets released 5. Thread 2 does all the initialization work inside the stream polling, releasing the lock 6. Thread 1 acquires the lock, but there's nothing to do, as all the initialization already happened, so it moves on -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org