Hi,

 

We are able to reproduce this bug in Spark 2.4 using the following program:

 

import scala.sys.process._

import org.apache.spark.TaskContext

 

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000,
x)}.repartition(20)

res.distinct.count 

 

// kill an executor in the stage that performs repartition(239)

val df = res.repartition(113).cache.repartition(239).map { x =>

  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1)
{

    throw new Exception("pkill -f java".!!)

  }

  x

}

df.distinct.count()

 

The first df.distinct.count correctly produces 100000000
The second df.distinct.count incorrect produces 99999769
 

If the cache step is removed then the bug does not reproduce. 

 

Best regards,

Tyson

 

Reply via email to