On 19 Jun 2017, at 16:55, Ryan Blue <rb...@netflix.com.INVALID<mailto:rb...@netflix.com.INVALID>> wrote:
I agree, the problem is that Spark is trying to be safe and avoid the direct committer. We also modify Spark to avoid its logic. We added a property that causes Spark to always use the output committer if the destination is in S3. I've changed Hadoop's FileOutputFormat to take a factory defining committers: any object store gets to write their own, etc. The FileOutputCommitter is now just one example, a rename()-based algorithm (two, really). Our committers are also slightly different and will get an AmazonS3 client from the destination file system using reflection. That way, it's always configured with the right credentials. The solution to set the credentials provider is another good one, thanks for sharing that. I think in the S3A version, the client is accessed by the committer using a package-private accessor. pretty much: it extends the helper class used in the output streams to get at the whole set of low level ops, while keeping the client hidden. And we have a special inconsistent client for testing now too. https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java Auth setup is from 1+ of: per-bucket config, fs.s3a base config, env vars, IAM metadata -which apparently gets throttled if you try hard enough. rb On Sat, Jun 17, 2017 at 10:04 AM, sririshindra <sririshin...@gmail.com<mailto:sririshin...@gmail.com>> wrote: Hi, as @Venkata krishnan pointed out spark does not allow DFOC when append mode is enabled. in the following class in spark, there is a small check org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol if (isAppend) { // If we are appending data to an existing dir, we will only use the output committer // associated with the file output format since it is not safe to use a custom // committer for appending. For example, in S3, direct parquet output committer may // leave partial data in the destination dir when the appending job fails. // See SPARK-8578 for more details. There's a fair amount parquet-specific bits of the spark output codechain; I've been avoiding it while I get CSV & ORC working so I'll be better placed to insert stuff. As it stands, the new hadoop committer plugin mech is working with the imported/migrated staging committer of Ryan, even when I turn on list inconsistency, which I consider a success. I do want more tests now: currently I'm picking up some of the subclasses of QueryTest & making them generic for any FileSystem rather than the local FS, then running with the new committers. Things work, but I need to think of more ways to really stress the commit protocol in ways where failures can be observed. However, the reasoning mentioned in the above comments is probably (maybe ryan or steve can confirm this assumption) not applicable to the Netflix commiter uploaded by Ryan blue. Because Ryan's commiter uses multipart upload. So either the whole file is live or nothing is. partial data will not be available for read. Whatever partial data that might have been uploaded to s3 by a failed job will be removed after 1 day (I think this the default in ryan's code. This can be modified using the following config (fs.s3a.multipart.purge.age -- 86400)) That's pretty much it. the files don't materialize to the final close, so you can write them to their destination, as long as you remember what you have outstanding and either commit or abort it. Commits of >1 file isn't atomic, but the time to commit is minimal, and indeed, the Hadoop MR protocol isn't quite as atomic as you think there. So I simply changed the code to if (true) { and rebuilt spark from scratch. everything is working well for me in my initial tests. There is one more problem I wanted to mention. For some reason, I am getting an authentication issue while using ryan's code. I made the following change inside ryan's code. I changed the findClinet method in S3MultiPartOutputCommiter.java (Ryan's repo) to the following protected Object findClient(Path path, Configuration conf) { System.out.println("findinClinet in S3MultipartOutPutCommiter"); //AWSCredentials //AmazonS3Client cli = new AmazonS3Client(new ProfileCredentialsProvider("/home/user/.aws/credentials", "default")); AmazonS3Client cli = new AmazonS3Client(new com.amazonaws.auth.EnvironmentVariableCredentialsProvider()); //new AmazonS3Client(); System.out.println(cli); return cli; //return new AmazonS3Client(new ProfileCredentialsProvider("/home/user/.aws/credentials", "default")); } We just have to set the s3 credentials in the ~/.bashrc file. Take a look at S3AUtils.createAWSCredentialProviderSet to see what goes on there; know that it's been/is undergoing some changes so you can't expect stability.