On 19 Jun 2017, at 16:55, Ryan Blue 
<rb...@netflix.com.INVALID<mailto:rb...@netflix.com.INVALID>> wrote:

I agree, the problem is that Spark is trying to be safe and avoid the direct 
committer. We also modify Spark to avoid its logic. We added a property that 
causes Spark to always use the output committer if the destination is in S3.

I've changed Hadoop's FileOutputFormat to take a factory defining committers: 
any object store gets to write their own, etc. The FileOutputCommitter is now  
just one example, a rename()-based algorithm (two, really).


Our committers are also slightly different and will get an AmazonS3 client from 
the destination file system using reflection. That way, it's always configured 
with the right credentials. The solution to set the credentials provider is 
another good one, thanks for sharing that. I think in the S3A version, the 
client is accessed by the committer using a package-private accessor.

pretty much: it extends the helper class used in the output streams to get at 
the whole set of low level ops, while keeping the client hidden. And we have a 
special inconsistent client for testing now too.

https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Auth setup is from 1+ of: per-bucket config, fs.s3a base config, env vars, IAM 
metadata -which apparently gets throttled if you try hard enough.


rb

On Sat, Jun 17, 2017 at 10:04 AM, sririshindra 
<sririshin...@gmail.com<mailto:sririshin...@gmail.com>> wrote:
Hi,

as @Venkata krishnan pointed out spark does not allow DFOC when append mode
is enabled.

in the following class in spark, there is a small check

org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol


    if (isAppend) {
      // If we are appending data to an existing dir, we will only use the
output committer
      // associated with the file output format since it is not safe to use
a custom
      // committer for appending. For example, in S3, direct parquet output
committer may
      // leave partial data in the destination dir when the appending job
fails.
      // See SPARK-8578 for more details.


There's a fair amount parquet-specific bits of the spark output codechain; I've 
been avoiding it while I get CSV & ORC working so I'll be better placed to 
insert stuff.

As it stands, the new hadoop committer plugin mech is working with the 
imported/migrated staging committer of Ryan, even when I turn on list 
inconsistency, which I consider a success. I do want more tests now: currently 
I'm picking up some of the subclasses of QueryTest & making them generic for 
any FileSystem rather than the local FS, then running with the new committers.

Things work, but I need to think of more ways to really stress the commit 
protocol in ways where failures can be observed.


However, the reasoning mentioned in the above comments is probably (maybe
ryan or steve can confirm this assumption) not applicable to the Netflix
commiter uploaded by Ryan blue. Because Ryan's commiter uses multipart
upload. So either the whole file is live or nothing is. partial data will
not be available for read. Whatever partial data that might have been
uploaded to s3 by a failed job will be removed after 1 day (I think this the
default in ryan's code. This can be modified using the following config
(fs.s3a.multipart.purge.age -- 86400))

That's pretty much it. the files don't materialize to the final close, so you 
can write them to their destination, as long as you remember what you have 
outstanding and either commit or abort it.  Commits of >1 file isn't atomic, 
but the time to commit is minimal, and indeed, the Hadoop MR protocol isn't 
quite as atomic as you think there.




So I simply changed the code to
     if (true) {

and rebuilt spark from scratch. everything is working well for me in my
initial tests.


There is one more problem I wanted to mention. For some reason, I am getting
an authentication issue while using ryan's code. I made the following change
inside ryan's code.

I changed the findClinet method in S3MultiPartOutputCommiter.java (Ryan's
repo) to the following

  protected Object findClient(Path path, Configuration conf) {
      System.out.println("findinClinet in S3MultipartOutPutCommiter");
      //AWSCredentials
      //AmazonS3Client cli = new AmazonS3Client(new
ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
      AmazonS3Client cli = new AmazonS3Client(new
com.amazonaws.auth.EnvironmentVariableCredentialsProvider()); //new
AmazonS3Client();
      System.out.println(cli);
      return cli;
    //return new AmazonS3Client(new
ProfileCredentialsProvider("/home/user/.aws/credentials", "default"));
  }


We just have to set the s3 credentials in the ~/.bashrc file.



Take a look at S3AUtils.createAWSCredentialProviderSet to see what goes on 
there; know that it's been/is undergoing some changes so you can't expect 
stability.



Reply via email to