Yeah, this is handled by the "commit" call of the FileOutputFormat. In general Hadoop OutputFormats have a concept called "committing" the output, which you should do only once per partition. In the file ones it does an atomic rename to make sure that the final output is a complete file.
Matei On Jul 15, 2014, at 2:49 PM, Tathagata Das <[email protected]> wrote: > The way the HDFS file writing works at a high level is that each attempt to > write a partition to a file starts writing to unique temporary file (say, > something like targetDirectory/_temp/part-XXXXX_attempt-YYYY). If the writing > into the file successfully completes, then the temporary file is moved to the > final location (say, targetDirectory/part-XXXXX). If, due to speculative > execution, the file already exists in the final intended location, then move > is avoided. Or, its overwritten, I forget the implementation. Either ways, > all attempts to write the same partition, will always write the same data to > the temp file (assuming the spark transformation generating the data is > deterministic and idempotent). And once one attempt is successful, the final > file will have the same data. Hence, writing to HDFS / S3 is idempotent. > > Now this logic is already implemented within the Hadoop's MapReduce logic, > and Spark just uses it directly. > > TD > > > On Tue, Jul 15, 2014 at 2:33 PM, Mingyu Kim <[email protected]> wrote: > Thanks for the explanation, guys. > > I looked into the saveAsHadoopFile implementation a little bit. If you see > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala > at line 843, the HDFS write happens at per-partition processing, not at the > result handling, so I have a feeling that it might be writing multiple times. > This may be fine if both tasks for the same partition completes because it > will simply overwrite the output partition with the same content, but this > could be an issue if one of the tasks completes and the other is in the > middle of writing the partition by the time the entire stage completes. Can > someone explain this? > > Bertrand, I’m slightly confused about your comment. So, is it the case that > HDFS will handle the writes as a temp file write followed by an atomic move, > so the concern I had above is handled at the HDFS level? > > Mingyu > > From: Bertrand Dechoux <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Tuesday, July 15, 2014 at 1:22 PM > To: "[email protected]" <[email protected]> > Subject: Re: How does Spark speculation prevent duplicated work? > > I haven't look at the implementation but what you would do with any > filesystem is write to a file inside the workspace directory of the task. And > then only the attempt of the task that should be kept will perform a move to > the final path. The other attempts are simply discarded. For most filesystem > (and that's the case for HDFS), a 'move' is a very simple and fast action > because only the "full path/name" of the file change but not its content or > where this content is physically stored. > > Executive speculation happens in Hadoop MapReduce. Spark has the same > concept. As long as you apply functions with no side effect (ie the only > impact is the returned results), then you just need to not take into account > results from additional attempts of the same task/operator. > > Bertrand Dechoux > > > On Tue, Jul 15, 2014 at 9:34 PM, Andrew Ash <[email protected]> wrote: > Hi Nan, > > Great digging in -- that makes sense to me for when a job is producing some > output handled by Spark like a .count or .distinct or similar. > > For the other part of the question, I'm also interested in side effects like > an HDFS disk write. If one task is writing to an HDFS path and another task > starts up, wouldn't it also attempt to write to the same path? How is that > de-conflicted? > > > On Tue, Jul 15, 2014 at 3:02 PM, Nan Zhu <[email protected]> wrote: > Hi, Mingyuan, > > According to my understanding, > > Spark processes the result generated from each partition by passing them to > resultHandler (SparkContext.scala L1056) > > This resultHandler is usually just put the result in a driver-side array, the > length of which is always partitions.size > > this design effectively ensures that the actions are idempotent > > e.g. the count is implemented as > > def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum > > even the task in the partition is duplicately executed, the result put in the > array is the same > > > > At the same time, I think the Spark implementation ensures that the operation > applied on the return value of SparkContext.runJob will not be triggered when > the duplicate tasks are finished > > Because, > > > when a task is finished, the code execution path is > TaskSetManager.handleSuccessfulTask -> DAGScheduler.taskEnded > > in taskEnded, it will trigger the CompletionEvent message handler, where > DAGScheduler will check if (!job.finished(rt.outputid)) and rt.outputid is > the partitionid > > so even the duplicate task invokes a CompletionEvent message, it will find > job.finished(rt.outputId) has been true eventually > > > Maybe I was wrong…just went through the code roughly, welcome to correct me > > Best, > > > -- > Nan Zhu > > On Tuesday, July 15, 2014 at 1:55 PM, Mingyu Kim wrote: > >> Hi all, >> >> I was curious about the details of Spark speculation. So, my understanding >> is that, when “speculated” tasks are newly scheduled on other machines, the >> original tasks are still running until the entire stage completes. This >> seems to leave some room for duplicated work because some spark actions are >> not idempotent. For example, it may be counting a partition twice in case of >> RDD.count or may be writing a partition to HDFS twice in case of >> RDD.save*(). How does it prevent this kind of duplicated work? >> >> Mingyu >> >> Attachments: >> - smime.p7s > > > >
