crepererum commented on code in PR #16093:
URL: https://github.com/apache/datafusion/pull/16093#discussion_r2106955247


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -595,30 +651,29 @@ impl ExecutionPlan for RepartitionExec {
         // Get existing ordering to use for merging
         let sort_exprs = self.sort_exprs().cloned().unwrap_or_default();
 
+        let state = Arc::clone(&self.state);
+        if !self.state_initialized.swap(true, Ordering::Relaxed) {
+            state.lock().ensure_input_streams_initialized(

Review Comment:
   The observation done by @gabotechs here 
https://github.com/apache/datafusion/pull/16093#discussion_r2096083785 is 
correct:
   
   1. The original intention was to make `execute` not content.
   2. The delay is indeed a breakage of the API contract.
   
   However I want to broaden the discussion a bit:
   
   This is racy: the first thread could check the boolean but not get the lock, 
while other threads skip the IF body and start to poll, at which point they get 
the lock (around line 670 WITH PR applied, around the comment "lock mutexes") 
and now you're blocking the async IO runtime with initialization work, or with 
your implementation you just get an error 
`RepartitionExecState::init_input_streams must be called before consuming input 
streams` (this needs to be fixed before merging because this might happen under 
high load!).
   
   And even if you would find a non-racy API that combines the boolean w/ the 
lock, you still have the same semantic race. I think the question is: are we 
allowed to poll streams even if not all `execute` calls finished? I would say: 
yes. Like in theory you could even interleave:
   
   1. execute partition 0
   2. poll partition 0
   3. execute partition 1
   4. poll partition 1
   5. ...
   
   I think in general, this problem cannot be fully fixed though: you either 
block during `execute` or you potentially block during `poll`, at least as long 
the `execute` method needs to be called PER PARTITION -- which TBH might have 
been a somewhat unfortunate choice, I would rather call it once and return a 
vector of streams.
   
   So the question is: is this PR better then the status quo? I would say yes, 
but I would like to see at least one additional test to simulate the race 
described above so it doesn't error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to