I updated the PR for SPARK-6352 to be more like SPARK-3595. I added a new setting "spark.sql.parquet.output.committer.class" in hadoop configuration to allow custom implementation of ParquetOutputCommitter. Can someone take a look at the PR?
On Mon, Mar 16, 2015 at 5:23 PM, Pei-Lun Lee <pl...@appier.com> wrote: > Hi, > > I created a JIRA and PR for supporting a s3 friendly output committer for > saveAsParquetFile: > https://issues.apache.org/jira/browse/SPARK-6352 > https://github.com/apache/spark/pull/5042 > > My approach is add a DirectParquetOutputCommitter class in spark-sql > package and use a boolean config variable > spark.sql.parquet.useDirectParquetOutputCommitter to choose between default > output committer. > This may not be the smartest solution but it works for me. > Tested on spark 1.1, 1.3 with hadoop 1.0.4. > > > On Thu, Mar 5, 2015 at 4:32 PM, Aaron Davidson <ilike...@gmail.com> wrote: > >> Yes, unfortunately that direct dependency makes this injection much more >> difficult for saveAsParquetFile. >> >> On Thu, Mar 5, 2015 at 12:28 AM, Pei-Lun Lee <pl...@appier.com> wrote: >> >>> Thanks for the DirectOutputCommitter example. >>> However I found it only works for saveAsHadoopFile. What about >>> saveAsParquetFile? >>> It looks like SparkSQL is using ParquetOutputCommitter, which is subclass >>> of FileOutputCommitter. >>> >>> On Fri, Feb 27, 2015 at 1:52 AM, Thomas Demoor < >>> thomas.dem...@amplidata.com> >>> wrote: >>> >>> > FYI. We're currently addressing this at the Hadoop level in >>> > https://issues.apache.org/jira/browse/HADOOP-9565 >>> > >>> > >>> > Thomas Demoor >>> > >>> > On Mon, Feb 23, 2015 at 10:16 PM, Darin McBeath < >>> > ddmcbe...@yahoo.com.invalid> wrote: >>> > >>> >> Just to close the loop in case anyone runs into the same problem I >>> had. >>> >> >>> >> By setting --hadoop-major-version=2 when using the ec2 scripts, >>> >> everything worked fine. >>> >> >>> >> Darin. >>> >> >>> >> >>> >> ----- Original Message ----- >>> >> From: Darin McBeath <ddmcbe...@yahoo.com.INVALID> >>> >> To: Mingyu Kim <m...@palantir.com>; Aaron Davidson < >>> ilike...@gmail.com> >>> >> Cc: "u...@spark.apache.org" <u...@spark.apache.org> >>> >> Sent: Monday, February 23, 2015 3:16 PM >>> >> Subject: Re: Which OutputCommitter to use for S3? >>> >> >>> >> Thanks. I think my problem might actually be the other way around. >>> >> >>> >> I'm compiling with hadoop 2, but when I startup Spark, using the ec2 >>> >> scripts, I don't specify a >>> >> -hadoop-major-version and the default is 1. I'm guessing that if I >>> make >>> >> that a 2 that it might work correctly. I'll try it and post a >>> response. >>> >> >>> >> >>> >> ----- Original Message ----- >>> >> From: Mingyu Kim <m...@palantir.com> >>> >> To: Darin McBeath <ddmcbe...@yahoo.com>; Aaron Davidson < >>> >> ilike...@gmail.com> >>> >> Cc: "u...@spark.apache.org" <u...@spark.apache.org> >>> >> Sent: Monday, February 23, 2015 3:06 PM >>> >> Subject: Re: Which OutputCommitter to use for S3? >>> >> >>> >> Cool, we will start from there. Thanks Aaron and Josh! >>> >> >>> >> Darin, it¹s likely because the DirectOutputCommitter is compiled with >>> >> Hadoop 1 classes and you¹re running it with Hadoop 2. >>> >> org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, >>> and it >>> >> became an interface in Hadoop 2. >>> >> >>> >> Mingyu >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> On 2/23/15, 11:52 AM, "Darin McBeath" <ddmcbe...@yahoo.com.INVALID> >>> >> wrote: >>> >> >>> >> >Aaron. Thanks for the class. Since I'm currently writing Java based >>> >> >Spark applications, I tried converting your class to Java (it seemed >>> >> >pretty straightforward). >>> >> > >>> >> >I set up the use of the class as follows: >>> >> > >>> >> >SparkConf conf = new SparkConf() >>> >> >.set("spark.hadoop.mapred.output.committer.class", >>> >> >"com.elsevier.common.DirectOutputCommitter"); >>> >> > >>> >> >And I then try and save a file to S3 (which I believe should use the >>> old >>> >> >hadoop apis). >>> >> > >>> >> >JavaPairRDD<Text, Text> newBaselineRDDWritable = >>> >> >reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes()); >>> >> >newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, >>> >> >Text.class, Text.class, SequenceFileOutputFormat.class, >>> >> >org.apache.hadoop.io.compress.GzipCodec.class); >>> >> > >>> >> >But, I get the following error message. >>> >> > >>> >> >Exception in thread "main" java.lang.IncompatibleClassChangeError: >>> Found >>> >> >class org.apache.hadoop.mapred.JobContext, but interface was expected >>> >> >at >>> >> >>> >> >>> >com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter. >>> >> >java:68) >>> >> >at >>> >> >>> >org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127) >>> >> >at >>> >> >>> >> >>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions >>> >> >.scala:1075) >>> >> >at >>> >> >>> >> >>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc >>> >> >ala:940) >>> >> >at >>> >> >>> >> >>> >org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc >>> >> >ala:902) >>> >> >at >>> >> >>> >> >>> >org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7 >>> >> >71) >>> >> >at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156) >>> >> > >>> >> >In my class, JobContext is an interface of type >>> >> >org.apache.hadoop.mapred.JobContext. >>> >> > >>> >> >Is there something obvious that I might be doing wrong (or messed up >>> in >>> >> >the translation from Scala to Java) or something I should look >>> into? I'm >>> >> >using Spark 1.2 with hadoop 2.4. >>> >> > >>> >> > >>> >> >Thanks. >>> >> > >>> >> >Darin. >>> >> > >>> >> > >>> >> >________________________________ >>> >> > >>> >> > >>> >> >From: Aaron Davidson <ilike...@gmail.com> >>> >> >To: Andrew Ash <and...@andrewash.com> >>> >> >Cc: Josh Rosen <rosenvi...@gmail.com>; Mingyu Kim <m...@palantir.com >>> >; >>> >> >"u...@spark.apache.org" <u...@spark.apache.org>; Aaron Davidson >>> >> ><aa...@databricks.com> >>> >> >Sent: Saturday, February 21, 2015 7:01 PM >>> >> >Subject: Re: Which OutputCommitter to use for S3? >>> >> > >>> >> > >>> >> > >>> >> >Here is the class: >>> >> > >>> >> >>> https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron >>> >> >>> >> >>> >dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o >>> >> >>> >> >>> >Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs >>> >> >>> >zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e= >>> >> > >>> >> >You can use it by setting "mapred.output.committer.class" in the >>> Hadoop >>> >> >configuration (or "spark.hadoop.mapred.output.committer.class" in the >>> >> >Spark configuration). Note that this only works for the old Hadoop >>> APIs, >>> >> >I believe the new Hadoop APIs strongly tie committer to input format >>> (so >>> >> >FileInputFormat always uses FileOutputCommitter), which makes this >>> fix >>> >> >more difficult to apply. >>> >> > >>> >> > >>> >> > >>> >> > >>> >> >On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash <and...@andrewash.com> >>> >> wrote: >>> >> > >>> >> >Josh is that class something you guys would consider open sourcing, >>> or >>> >> >would you rather the community step up and create an OutputCommitter >>> >> >implementation optimized for S3? >>> >> >> >>> >> >> >>> >> >>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen <rosenvi...@gmail.com> >>> >> wrote: >>> >> >> >>> >> >>We (Databricks) use our own DirectOutputCommitter implementation, >>> which >>> >> >>is a couple tens of lines of Scala code. The class would almost >>> >> >>entirely be a no-op except we took some care to properly handle the >>> >> >>_SUCCESS file. >>> >> >>> >>> >> >>> >>> >> >>>On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim <m...@palantir.com> >>> wrote: >>> >> >>> >>> >> >>>I didn¹t get any response. It¹d be really appreciated if anyone >>> using a >>> >> >>>special OutputCommitter for S3 can comment on this! >>> >> >>>> >>> >> >>>> >>> >> >>>>Thanks, >>> >> >>>>Mingyu >>> >> >>>> >>> >> >>>> >>> >> >>>>From: Mingyu Kim <m...@palantir.com> >>> >> >>>>Date: Monday, February 16, 2015 at 1:15 AM >>> >> >>>>To: "u...@spark.apache.org" <u...@spark.apache.org> >>> >> >>>>Subject: Which OutputCommitter to use for S3? >>> >> >>>> >>> >> >>>> >>> >> >>>> >>> >> >>>>HI all, >>> >> >>>> >>> >> >>>> >>> >> >>>>The default OutputCommitter used by RDD, which is >>> FileOutputCommitter, >>> >> >>>>seems to require moving files at the commit step, which is not a >>> >> >>>>constant operation in S3, as discussed in >>> >> >>>> >>> >> >>> https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apa >>> >> >>> >> >>> >>>>che.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40ent >>> >> >>> >> >>> >>>>ropy.be-253E&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=e >>> >> >>> >> >>> >>>>nnQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFszOvl_- >>> >> >>> >>>>ZnxmkBPHo1K24TfGE&s=EQOZaHRANJupdjXCfHSXL2t5BZ9YgMt2pRc3pht4o7o&e= . >>> >> >>> >> >>>>People seem to develop their own NullOutputCommitter >>> implementation or >>> >> >>>>use DirectFileOutputCommitter (as mentioned in SPARK-3595), but I >>> >> >>>>wanted to check if there is a de facto standard, publicly >>> available >>> >> >>>>OutputCommitter to use for S3 in conjunction with Spark. >>> >> >>>> >>> >> >>>> >>> >> >>>>Thanks, >>> >> >>>>Mingyu >>> >> >>> >>> >> >> >>> >> > >>> >> >--------------------------------------------------------------------- >>> >> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >> >For additional commands, e-mail: user-h...@spark.apache.org >>> >> >>> >> > >>> >> >>> >> --------------------------------------------------------------------- >>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >>> >> --------------------------------------------------------------------- >>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >>> >> >>> > >>> >> >> >