Hi Tyson, Thanks for the reporting! I reproduced this locally based on your code with some changes, which only keep the wrong answer job. The code as below:
import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() I think the reason for the wrong answer is, in the CachedRDDBuilder, we miss tracking the `isOrderSensitive` characteristic to the newly created MapPartitionsRDD. Jira created in: https://issues.apache.org/jira/browse/SPARK-28699. The fix will base on Wenchen's work SPARK-23243. Currently, we make the job fail when we find an indeterminate stage retry. Feel free to have a review. The support of Spark rerun the indeterminate stage will be done after SPARK-25341 <https://issues.apache.org/jira/browse/SPARK-25341>. If you need the indeterminate stage after cache operation right now, you can test on this branch <https://github.com/xuanyuanking/spark/tree/SPARK-28699-RERUN>. Best, Yuanjian Wenchen Fan <cloud0...@gmail.com> 于2019年8月12日周一 下午8:19写道: > Hi Tyson, > > Thanks for reporting it! I quickly checked the related scheduler code but > can't find an obvious place that can go wrong with cached RDD. > > Sean said that he can't produce it, but the second job fails. This is > actually expected. We need a lot more changes to completely fix this > problem, so currently the fix is to fail the job if the scheduler needs to > retry an indeterminate shuffle map stage. > > It would be great to know if we can reproduce this bug with the master > branch. > > Thanks, > Wenchen > > On Sun, Aug 11, 2019 at 7:22 AM Xiao Li <lix...@databricks.com> wrote: > >> Hi, Tyson, >> >> Could you open a new JIRA with correctness label? SPARK-23207 might not >> cover all the scenarios, especially when you using cache. >> >> Cheers, >> >> Xiao >> >> On Fri, Aug 9, 2019 at 9:26 AM <tcon...@gmail.com> wrote: >> >>> Hi Sean, >>> >>> To finish the job, I did need to set spark.stage.maxConsecutiveAttempts >>> to a large number e.g., 100; a suggestion from Jiang Xingbo. >>> >>> I haven't seen any recent movement/PRs on this issue, but I'll see if we >>> can repro with a more recent version of Spark. >>> >>> Best regards, >>> Tyson >>> >>> -----Original Message----- >>> From: Sean Owen <sro...@gmail.com> >>> Sent: Friday, August 9, 2019 7:49 AM >>> To: tcon...@gmail.com >>> Cc: dev <dev@spark.apache.org> >>> Subject: Re: [SPARK-23207] Repro >>> >>> Interesting but I'd put this on the JIRA, and also test vs master first. >>> It's entirely possible this is something else that was subsequently fixed, >>> and maybe even backported for 2.4.4. >>> (I can't quite reproduce it - just makes the second job fail, which is >>> also puzzling) >>> >>> On Fri, Aug 9, 2019 at 8:11 AM <tcon...@gmail.com> wrote: >>> > >>> > Hi, >>> > >>> > >>> > >>> > We are able to reproduce this bug in Spark 2.4 using the following >>> program: >>> > >>> > >>> > >>> > import scala.sys.process._ >>> > >>> > import org.apache.spark.TaskContext >>> > >>> > >>> > >>> > val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, >>> > x)}.repartition(20) >>> > >>> > res.distinct.count >>> > >>> > >>> > >>> > // kill an executor in the stage that performs repartition(239) >>> > >>> > val df = res.repartition(113).cache.repartition(239).map { x => >>> > >>> > if (TaskContext.get.attemptNumber == 0 && >>> > TaskContext.get.partitionId < 1) { >>> > >>> > throw new Exception("pkill -f java".!!) >>> > >>> > } >>> > >>> > x >>> > >>> > } >>> > >>> > df.distinct.count() >>> > >>> > >>> > >>> > The first df.distinct.count correctly produces 100000000 >>> > >>> > The second df.distinct.count incorrect produces 99999769 >>> > >>> > >>> > >>> > If the cache step is removed then the bug does not reproduce. >>> > >>> > >>> > >>> > Best regards, >>> > >>> > Tyson >>> > >>> > >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>> >>> >> >> -- >> [image: Databricks Summit - Watch the talks] >> <https://databricks.com/sparkaisummit/north-america> >> >