Hi, I have spark job that produces duplicates when one or tasks from
repartition stage fails.
Here is simplified code.

sparkContext.setCheckpointDir("hdfs://path-to-checkpoint-dir")

*val *inputRDDs: List[RDD[String]] = *List*.*empty *// an RDD per input dir


*val *updatedRDDs = inputRDDs.map{ inputRDD => // some stuff happens here

  inputRDD

    .filter(*???*)

     .map(*???*)

}


*val *unionOfUpdatedRDDs = sparkContext.union(updatedRDDs)

unionOfUpdatedRDDs.checkpoint() // id didn't help


unionOfUpdatedRDDs

  .repartition(42) // task failed here,

  .saveAsNewAPIHadoopFile("/path") // task failed here too.

// what really causes duplicates in output?

Reply via email to