We just wrote a couple new committers for S3 that we're beginning to roll out to our Spark users. I've uploaded a repo with it if you'd like to take a look:
https://github.com/rdblue/s3committer The main problem with the UUID approach is that data is live as soon as the S3 upload completes. That means that readers can get partial results while a job is running that may not be eventually committed (since you will remove the UUID later). You may also have a problem with partitioned task outputs. You'd have to encode the task ID in the output file name to identify files to roll back in the event you need to revert a task, but if you have partitioned output, you have to do a lot of directory listing to find all the files that need to be removed. That, or you could risk duplicate data by not rolling back tasks. The approach we took is to use the multi-part upload API to stage data from tasks without issuing the final call to complete the upload and make the data live in S3. That way, we get distributed uploads without any visible data until the job committer runs. The job committer reads all of the pending uploads and commits them. If the job has failed, then it can roll back the known uploads by aborting them instead, with the data never visible to readers. The flaw in this approach is that you can still get partial writes if the driver fails while running the job committer, but it covers the other cases. We're working on getting users moved over to the new committers, so now seems like a good time to get a copy out to the community. Please let me know what you think. rb On Mon, Feb 20, 2017 at 10:14 AM, Matthew Schauer <matthew.scha...@ibm.com> wrote: > I'm using Spark 1.5.2 and trying to append a data frame to partitioned > Parquet directory in S3. It is known that the default > `ParquetOutputCommitter` performs poorly in S3 because move is implemented > as copy/delete, but the `DirectParquetOutputCommitter` is not safe to use > for append operations in case of failure. I'm not very familiar with the > intricacies of job/task committing/aborting, but I've written a rough > replacement output committer that seems to work. It writes the results > directly to their final locations and uses the write UUID to determine > which > files to remove in the case of a job/task abort. It seems to be a workable > concept in the simple tests that I've tried. However, I can't make Spark > use this alternate output committer because the changes in SPARK-8578 > categorically prohibit any custom output committer from being used, even if > it's safe for appending. I have two questions: 1) Does anyone more > familiar > with output committing have any feedback on my proposed "safe" append > strategy, and 2) is there any way to circumvent the restriction on append > committers without editing and recompiling Spark? Discussion of solutions > in Spark 2.1 is also welcome. > > > > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/Output-Committers- > for-S3-tp21033.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > > -- Ryan Blue Software Engineer Netflix