Hi, Mingyuan, According to my understanding,
Spark processes the result generated from each partition by passing them to resultHandler (SparkContext.scala L1056) This resultHandler is usually just put the result in a driver-side array, the length of which is always partitions.size this design effectively ensures that the actions are idempotent e.g. the count is implemented as def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum even the task in the partition is duplicately executed, the result put in the array is the same At the same time, I think the Spark implementation ensures that the operation applied on the return value of SparkContext.runJob will not be triggered when the duplicate tasks are finished Because, when a task is finished, the code execution path is TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded in taskEnded, it will trigger the CompletionEvent message handler, where DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is the partitionid so even the duplicate task invokes a CompletionEvent message, it will find job.finished(rt.outputId) has been true eventually Maybe I was wrong…just went through the code roughly, welcome to correct me Best, -- Nan Zhu On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote: > Hi all, > > I was curious about the details of Spark speculation. So, my understanding is > that, when “speculated” tasks are newly scheduled on other machines, the > original tasks are still running until the entire stage completes. This seems > to leave some room for duplicated work because some spark actions are not > idempotent. For example, it may be counting a partition twice in case of > RDD.count or may be writing a partition to HDFS twice in case of RDD.save*(). > How does it prevent this kind of duplicated work? > > Mingyu > > > Attachments: > - smime.p7s >