Yeah, this is handled by the "commit" call of the FileOutputFormat. In general 
Hadoop OutputFormats have a concept called "committing" the output, which you 
should do only once per partition. In the file ones it does an atomic rename to 
make sure that the final output is a complete file.

Matei

On Jul 15, 2014, at 2:49 PM, Tathagata Das <[email protected]> wrote:

> The way the HDFS file writing works at a high level is that each attempt to 
> write a partition to a file starts writing to unique temporary file (say, 
> something like targetDirectory/_temp/part-XXXXX_attempt-YYYY). If the writing 
> into the file successfully completes, then the temporary file is moved to the 
> final location (say, targetDirectory/part-XXXXX). If, due to speculative 
> execution, the file already exists in the final intended location, then move 
> is avoided. Or, its overwritten, I forget the implementation. Either ways, 
> all attempts to write the same partition, will always write the same data to 
> the temp file (assuming the spark transformation generating the data is 
> deterministic and idempotent). And once one attempt is successful, the final 
> file will have the same data. Hence, writing to HDFS / S3 is idempotent. 
> 
> Now this logic is already implemented within the Hadoop's MapReduce logic, 
> and Spark just uses it directly. 
> 
> TD
> 
> 
> On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim <[email protected]> wrote:
> Thanks for the explanation, guys.
> 
> I looked into the saveAsHadoopFile implementation a little bit. If you see 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
>  at line 843, the HDFS write happens at per-partition processing, not at the 
> result handling, so I have a feeling that it might be writing multiple times. 
> This may be fine if both tasks for the same partition completes because it 
> will simply overwrite the output partition with the same content, but this 
> could be an issue if one of the tasks completes and the other is in the 
> middle of writing the partition by the time the entire stage completes. Can 
> someone explain this?
> 
> Bertrand, I’m slightly confused about your comment. So, is it the case that 
> HDFS will handle the writes as a temp file write followed by an atomic move, 
> so the concern I had above is handled at the HDFS level?
> 
> Mingyu
> 
> From: Bertrand Dechoux <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Tuesday, July 15, 2014 at 1:22 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: How does Spark speculation prevent duplicated work?
> 
> I haven't look at the implementation but what you would do with any 
> filesystem is write to a file inside the workspace directory of the task. And 
> then only the attempt of the task that should be kept will perform a move to 
> the final path. The other attempts are simply discarded. For most filesystem 
> (and that's the case for HDFS), a 'move' is a very simple and fast action 
> because only the "full path/name" of the file change but not its content or 
> where this content is physically stored.
> 
> Executive speculation happens in Hadoop MapReduce. Spark has the same 
> concept. As long as you apply functions with no side effect (ie the only 
> impact is the returned results), then you just need to not take into account 
> results from additional attempts of the same task/operator. 
> 
> Bertrand Dechoux
> 
> 
> On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <[email protected]> wrote:
> Hi Nan,
> 
> Great digging in -- that makes sense to me for when a job is producing some 
> output handled by Spark like a .count or .distinct or similar.
> 
> For the other part of the question, I'm also interested in side effects like 
> an HDFS disk write.  If one task is writing to an HDFS path and another task 
> starts up, wouldn't it also attempt to write to the same path?  How is that 
> de-conflicted?
> 
> 
> On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <[email protected]> wrote:
> Hi, Mingyuan, 
> 
> According to my understanding, 
> 
> Spark processes the result generated from each partition by passing them to 
> resultHandler (SparkContext.scala L1056)
> 
> This resultHandler is usually just put the result in a driver-side array, the 
> length of which is always partitions.size
> 
> this design effectively ensures that the actions are idempotent 
> 
> e.g. the count is implemented as 
> 
> def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
> 
> even the task in the partition is duplicately executed, the result put in the 
> array is the same
> 
> 
> 
> At the same time, I think the Spark implementation ensures that the operation 
> applied on the return value of SparkContext.runJob will not be triggered when 
> the duplicate tasks are finished
> 
> Because, 
> 
> 
> when a task is finished, the code execution path is 
> TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded 
> 
> in taskEnded, it will trigger the CompletionEvent message handler, where 
> DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is 
> the partitionid
> 
> so even the duplicate task invokes a CompletionEvent message, it will find 
> job.finished(rt.outputId) has been true eventually
> 
> 
> Maybe I was wrong…just went through the code roughly, welcome to correct me
> 
> Best,
> 
> 
> -- 
> Nan Zhu
> 
> On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote:
> 
>> Hi all,
>> 
>> I was curious about the details of Spark speculation. So, my understanding 
>> is that, when “speculated” tasks are newly scheduled on other machines, the 
>> original tasks are still running until the entire stage completes. This 
>> seems to leave some room for duplicated work because some spark actions are 
>> not idempotent. For example, it may be counting a partition twice in case of 
>> RDD.count or may be writing a partition to HDFS twice in case of 
>> RDD.save*(). How does it prevent this kind of duplicated work?
>> 
>> Mingyu
>> 
>> Attachments:
>> - smime.p7s
> 
> 
> 
> 

Reply via email to