ahshahid opened a new pull request, #50033:
URL: https://github.com/apache/spark/pull/50033

   …in retrying all partitions in case of indeterministic shuffle keys
   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a 
faster review.
     7. If you want to add a new configuration, please read the guideline first 
for naming configurations in
        
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the 
guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   In the DagScheduler and Stage, the actions of successful task completion is 
done in a Read Lock for the active Stage. While the actions of Failure task, 
which result in retry of the stage's task are done under Write lock for that 
stage.
   Because ReadWriteLocks are not upgrdable ( from read to write), and in a 
particular case of successful task completion , the thread owning the Read Lock 
in Stage, invokes the code of making a new Stage attempt, seeking Write Lock, a 
ThreadLocal is introduced in the Stage class, which keeps track of Threads 
which have already taken a Read Lock, so that they do not attempt to acquire 
Write Lock
   
   With the above changes, it turns out that two existing tests have wrong 
assertions , IMO, as all partitions are not subjected to be retried.
   
   
   ### Why are the changes needed?
   Once the above issue is fixed, that exposes a race condition, where a 
successful task completion concurrent with a task failure , for an 
inDeterminate stage, results in a situation , where instead of re-executing all 
partitions, only some are retried. This results in data loss.
   The race condition identified is as follows:
   a) A successful result stage task, is yet to mark in the boolean array 
tracking partitions success/failure as true/false.
   b) A concurrent failed result task, belonging to an InDeterminate stage, 
idenitfies all the stages which needs/ can be rolled back. For Result Stage, it 
looks into the array of successful partitions. As none is marked as true, the 
ResultStage and dependent stages are delegated to thread pool for retry.
   c) Between the time of collecting stages to rollback and re-try of stages, 
the successful task marks boolean as true.
   d) The Retry of Stage, as a result, misses the partition marked as 
successful, for retry.
   
   An existing test, has incorrect assertions regarding the number of 
partitions being retried , for an inDeterminate stage.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Have a functional test which exposes following:
   Data Loss due to buggy Stage.isInDeterminate 
[SPARK-51016](https://issues.apache.org/jira/browse/SPARK-51016)
   Race condition causing Data Loss , even if above bug's PR is taken.
   
   Attaching two files for reproducing the functional bug , showing the race 
condition causing data corruption.
   
   I am attaching 2 files for bug test
   
   bugrepro.patch
   This is needed to coax the single VM test to reproduce the issue. It has 
lots of interception and tweaks to ensure that system is able to hit the data 
loss situation.
   ( like each partition writes only a shuffle file containing keys evaluating 
to same hashCode and deleting the shuffle file at right time etc)
   The BugTest itself.
   a) If the bugrepro.patch is applied to current master and the BugTest run, 
it will fail immediately with assertion failure where instead of 12 rows, 6 
rows show up in result.
   
   b) If the bugrepro.patch is applied on top of PR 
[PR-SPARK-51016](https://github.com/apache/spark/pull/50029)  , then the 
BugTest will fail after one or two or more iterations, indicating the race 
condition in DataScheduler/Stage interaction.
   
   c) But if the same BugTest is run on branch containing fix for this bug as 
well as the PR [PR-SPARK-51016](https://github.com/apache/spark/pull/50029), it 
will pass in all the 100 iteration.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to