attilapiros commented on PR #50033:
URL: https://github.com/apache/spark/pull/50033#issuecomment-2695991251

   
   
   
   > > The provided unittest is invalid as it uses multiple threads to process 
events meanwhile the production code has a single dedicated thread so a race 
condition is not possible. ( apart from creating the right env. for 
FetchFailedException, the source code attempts to stop executor on the host in 
case of Exception, so without source code changes, end to end reproduction in 
single VM is very difficult for me).
   > > If you still think it is a race condition and you can reproduce the 
problem with a standalone test I suggest to add log lines two those places 
where you think the two threads are competing and use the "%t" formatter in 
log4j2 to include the thread names in the log. In this case please attach the 
reproduction code without any production code change (only the new logging 
lines should be added but it is fine if the reproduction should be retried 1000 
times as race conditions are flaky in nature but I prefer the original 
production code) and attach the section of the logs where you think the race 
occurs.
   > 
   > The end to end bug reproduction without product code change is not 
feasible for me ( atleast at this point), in a single VM unit test.
   > 
   > The race condition is possible ( and happens) because in the DagScheduler 
::handleTaskCompletion method, there is asynchronicity introduced due to 
following snippet of code 
   ``` 
   messageScheduler.schedule(
       new Runnable {
         override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
       },
       DAGScheduler.RESUBMIT_TIMEOUT,
       TimeUnit.MILLISECONDS
     )
   ```
   
   Here what we have is producer-consumer pattern.
   
   The `messageScheduler` is a queue connecting the producer(s) to the 
consumer. Regarding the producer side we are fine you can post from any number 
of threads (you can have multiple producers) but the processing of those event 
(the consumer side) is in question, if you assume the race condition is in one 
or multiple handle method calls where you introduced the locking (such as 
`handleTaskCompletion` which is called from 
https://github.com/apache/spark/blob/1da65be8921813a28472678cad170e19576fb173/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L3158
 method via 
https://github.com/apache/spark/blob/1da65be8921813a28472678cad170e19576fb173/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L3149)
  but that is single threaded because `DAGSchedulerEventProcessLoop` extends 
the `EventLoop` where the `onReceive()` method is called from a single thread:
   
   
https://github.com/apache/spark/blob/1da65be8921813a28472678cad170e19576fb173/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L42-L66
   
   The locking implementation itself also contains some bugs but first let's 
focus on understanding the problem at hand.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to