attilapiros commented on PR #50033: URL: https://github.com/apache/spark/pull/50033#issuecomment-2695991251
> > The provided unittest is invalid as it uses multiple threads to process events meanwhile the production code has a single dedicated thread so a race condition is not possible. ( apart from creating the right env. for FetchFailedException, the source code attempts to stop executor on the host in case of Exception, so without source code changes, end to end reproduction in single VM is very difficult for me). > > If you still think it is a race condition and you can reproduce the problem with a standalone test I suggest to add log lines two those places where you think the two threads are competing and use the "%t" formatter in log4j2 to include the thread names in the log. In this case please attach the reproduction code without any production code change (only the new logging lines should be added but it is fine if the reproduction should be retried 1000 times as race conditions are flaky in nature but I prefer the original production code) and attach the section of the logs where you think the race occurs. > > The end to end bug reproduction without product code change is not feasible for me ( atleast at this point), in a single VM unit test. > > The race condition is possible ( and happens) because in the DagScheduler ::handleTaskCompletion method, there is asynchronicity introduced due to following snippet of code ``` messageScheduler.schedule( new Runnable { override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS ) ``` Here what we have is producer-consumer pattern. The `messageScheduler` is a queue connecting the producer(s) to the consumer. Regarding the producer side we are fine you can post from any number of threads (you can have multiple producers) but the processing of those event (the consumer side) is in question, if you assume the race condition is in one or multiple handle method calls where you introduced the locking (such as `handleTaskCompletion` which is called from https://github.com/apache/spark/blob/1da65be8921813a28472678cad170e19576fb173/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L3158 method via https://github.com/apache/spark/blob/1da65be8921813a28472678cad170e19576fb173/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L3149) but that is single threaded because `DAGSchedulerEventProcessLoop` extends the `EventLoop` where the `onReceive()` method is called from a single thread: https://github.com/apache/spark/blob/1da65be8921813a28472678cad170e19576fb173/core/src/main/scala/org/apache/spark/util/EventLoop.scala#L42-L66 The locking implementation itself also contains some bugs but first let's focus on understanding the problem at hand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org