crepererum commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2107279416
########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); + let state = Arc::clone(&self.state); + if !self.state_initialized.swap(true, Ordering::Relaxed) { + state.lock().ensure_input_streams_initialized( Review Comment: > Seems highly unlikely (we've run this under high load for a while and it was never happened) I don't fancy to hunt Heisenbugs in production. Esp. some cloud deployments w/ low/partial CPU allocations might be subject to this issue. You can easily stall a thread of a few milliseconds. > I'm not sure if we can replicate this reliably in a unit test Fair, but the fact that the code is strongly typed and allows that to happen is reason to change it. I'm OK w/o a test then. The expected behavior would then be that you might need to block during stream polling (because thread 1 may still have the lock and runs the preparation step or the said poller needs to do that). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org