Hi Tyson,

Thanks for the reporting!
I reproduced this locally based on your code with some changes, which only
keep the wrong answer job. The code as below:

import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).cache.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 &&
TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber
== 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()

I think the reason for the wrong answer is, in the CachedRDDBuilder, we
miss tracking the `isOrderSensitive` characteristic to the newly created
MapPartitionsRDD. Jira created in:
https://issues.apache.org/jira/browse/SPARK-28699.
The fix will base on Wenchen's work SPARK-23243. Currently, we make the job
fail when we find an indeterminate stage retry. Feel free to have a review.

The support of Spark rerun the indeterminate stage will be done after
SPARK-25341 <https://issues.apache.org/jira/browse/SPARK-25341>. If you
need the indeterminate stage after cache operation right now, you can test
on this branch
<https://github.com/xuanyuanking/spark/tree/SPARK-28699-RERUN>.

Best,
Yuanjian

Wenchen Fan <cloud0...@gmail.com> 于2019年8月12日周一 下午8:19写道:

> Hi Tyson,
>
> Thanks for reporting it! I quickly checked the related scheduler code but
> can't find an obvious place that can go wrong with cached RDD.
>
> Sean said that he can't produce it, but the second job fails. This is
> actually expected. We need a lot more changes to completely fix this
> problem, so currently the fix is to fail the job if the scheduler needs to
> retry an indeterminate shuffle map stage.
>
> It would be great to know if we can reproduce this bug with the master
> branch.
>
> Thanks,
> Wenchen
>
> On Sun, Aug 11, 2019 at 7:22 AM Xiao Li <lix...@databricks.com> wrote:
>
>> Hi, Tyson,
>>
>> Could you open a new JIRA with correctness label? SPARK-23207 might not
>> cover all the scenarios, especially when you using cache.
>>
>> Cheers,
>>
>> Xiao
>>
>> On Fri, Aug 9, 2019 at 9:26 AM <tcon...@gmail.com> wrote:
>>
>>> Hi Sean,
>>>
>>> To finish the job, I did need to set spark.stage.maxConsecutiveAttempts
>>> to a large number e.g., 100; a suggestion from Jiang Xingbo.
>>>
>>> I haven't seen any recent movement/PRs on this issue, but I'll see if we
>>> can repro with a more recent version of Spark.
>>>
>>> Best regards,
>>> Tyson
>>>
>>> -----Original Message-----
>>> From: Sean Owen <sro...@gmail.com>
>>> Sent: Friday, August 9, 2019 7:49 AM
>>> To: tcon...@gmail.com
>>> Cc: dev <dev@spark.apache.org>
>>> Subject: Re: [SPARK-23207] Repro
>>>
>>> Interesting but I'd put this on the JIRA, and also test vs master first.
>>> It's entirely possible this is something else that was subsequently fixed,
>>> and maybe even backported for 2.4.4.
>>> (I can't quite reproduce it - just makes the second job fail, which is
>>> also puzzling)
>>>
>>> On Fri, Aug 9, 2019 at 8:11 AM <tcon...@gmail.com> wrote:
>>> >
>>> > Hi,
>>> >
>>> >
>>> >
>>> > We are able to reproduce this bug in Spark 2.4 using the following
>>> program:
>>> >
>>> >
>>> >
>>> > import scala.sys.process._
>>> >
>>> > import org.apache.spark.TaskContext
>>> >
>>> >
>>> >
>>> > val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000,
>>> > x)}.repartition(20)
>>> >
>>> > res.distinct.count
>>> >
>>> >
>>> >
>>> > // kill an executor in the stage that performs repartition(239)
>>> >
>>> > val df = res.repartition(113).cache.repartition(239).map { x =>
>>> >
>>> >   if (TaskContext.get.attemptNumber == 0 &&
>>> > TaskContext.get.partitionId < 1) {
>>> >
>>> >     throw new Exception("pkill -f java".!!)
>>> >
>>> >   }
>>> >
>>> >   x
>>> >
>>> > }
>>> >
>>> > df.distinct.count()
>>> >
>>> >
>>> >
>>> > The first df.distinct.count correctly produces 100000000
>>> >
>>> > The second df.distinct.count incorrect produces 99999769
>>> >
>>> >
>>> >
>>> > If the cache step is removed then the bug does not reproduce.
>>> >
>>> >
>>> >
>>> > Best regards,
>>> >
>>> > Tyson
>>> >
>>> >
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> [image: Databricks Summit - Watch the talks]
>> <https://databricks.com/sparkaisummit/north-america>
>>
>

Reply via email to