Hi, Mingyuan,   

According to my understanding,  

Spark processes the result generated from each partition by passing them to 
resultHandler (SparkContext.scala L1056)

This resultHandler is usually just put the result in a driver-side array, the 
length of which is always partitions.size

this design effectively ensures that the actions are idempotent  

e.g. the count is implemented as  

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

even the task in the partition is duplicately executed, the result put in the 
array is the same



At the same time, I think the Spark implementation ensures that the operation 
applied on the return value of SparkContext.runJob will not be triggered when 
the duplicate tasks are finished

Because,  


when a task is finished, the code execution path is 
TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded  

in taskEnded, it will trigger the CompletionEvent message handler, where 
DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is the 
partitionid

so even the duplicate task invokes a CompletionEvent message, it will find 
job.finished(rt.outputId) has been true eventually


Maybe I was wrong…just went through the code roughly, welcome to correct me

Best,


--  
Nan Zhu


On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:

> Hi all,
>  
> I was curious about the details of Spark speculation. So, my understanding is 
> that, when “speculated” tasks are newly scheduled on other machines, the 
> original tasks are still running until the entire stage completes. This seems 
> to leave some room for duplicated work because some spark actions are not 
> idempotent. For example, it may be counting a partition twice in case of 
> RDD.count or may be writing a partition to HDFS twice in case of RDD.save*(). 
> How does it prevent this kind of duplicated work?
>  
> Mingyu  
>  
>  
> Attachments:  
> - smime.p7s
>  


Reply via email to