> On 9 Sep 2016, at 21:54, Srikanth <srikanth...@gmail.com> wrote: > > Hello, > > I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a > few configs and none of them seem to work. > Output always creates _temporary directory. Rename is killing performance.
> I read some notes about DirectOutputcommitter causing problems with > speculation turned on. Was this option removed entirely? Spark turns off any committer with the word "direct' in its name if speculation==true . Concurrency, see. even on on-speculative execution, the trouble with the direct options is that executor/job failures can leave incomplete/inconsistent work around —and the things downstream wouldn't even notice There's work underway to address things, work which requires a consistent metadata store alongside S3 ( HADOOP-13345 : S3Guard). For now: stay with the file output committer hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true Even better: use HDFS as the intermediate store for work, only do a bulk upload at the end. > > val spark = SparkSession.builder() > .appName("MergeEntities") > .config("spark.sql.warehouse.dir", > mergeConfig.getString("sparkSqlWarehouseDir")) > .config("fs.s3a.buffer.dir", "/tmp") > .config("spark.hadoop.mapred.output.committer.class", > classOf[DirectOutputCommitter].getCanonicalName) > .config("mapred.output.committer.class", > classOf[DirectOutputCommitter].getCanonicalName) > .config("mapreduce.use.directfileoutputcommitter", "true") > //.config("spark.sql.sources.outputCommitterClass", > classOf[DirectOutputCommitter].getCanonicalName) > .getOrCreate() > > Srikanth