crepererum commented on code in PR #16093: URL: https://github.com/apache/datafusion/pull/16093#discussion_r2106955247
########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec { // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned().unwrap_or_default(); + let state = Arc::clone(&self.state); + if !self.state_initialized.swap(true, Ordering::Relaxed) { + state.lock().ensure_input_streams_initialized( Review Comment: The observation done by @gabotechs here https://github.com/apache/datafusion/pull/16093#discussion_r2096083785 is correct: 1. The original intention was to make `execute` not content. 2. The delay is indeed a breakage of the API contract. However I want to broaden the discussion a bit: This is racy: the first thread could check the boolean but not get the lock, while other threads skip the IF body and start to poll, at which point they get the lock (around line 670 WITH PR applied, around the comment "lock mutexes") and now you're blocking the async IO runtime with initialization work, or with your implementation you just get an error `RepartitionExecState::init_input_streams must be called before consuming input streams` (this needs to be fixed before merging because this might happen under high load!). And even if you would find a non-racy API that combines the boolean w/ the lock, you still have the same semantic race. I think the question is: are we allowed to poll streams even if not all `execute` calls finished? I would say: yes. Like in theory you could even interleave: 1. execute partition 0 2. poll partition 0 3. execute partition 1 4. poll partition 1 5. ... I think in general, this problem cannot be fully fixed though: you either block during `execute` or you potentially block during `poll`, at least as long the `execute` method needs to be called PER PARTITION -- which TBH might have been a somewhat unfortunate choice, I would rather call it once and return a vector of streams. So the question is: is this PR better then the status quo? I would say yes, but I would like to see at least one additional test to simulate the race described above so it doesn't error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org