> On 9 Sep 2016, at 21:54, Srikanth <srikanth...@gmail.com> wrote:
> 
> Hello,
> 
> I'm trying to use DirectOutputCommitter for s3a in Spark 2.0. I've tried a 
> few configs and none of them seem to work.
> Output always creates _temporary directory. Rename is killing performance.

> I read some notes about DirectOutputcommitter causing problems with 
> speculation turned on. Was this option removed entirely? 

Spark turns off any committer with the word "direct' in its name if 
speculation==true . Concurrency, see. 

even on on-speculative execution, the trouble with the direct options is that 
executor/job failures can leave incomplete/inconsistent work around —and the 
things downstream wouldn't even notice

There's work underway to address things, work which requires a consistent 
metadata store alongside S3 ( HADOOP-13345 : S3Guard).

For now: stay with the file output committer

hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true

Even better: use HDFS as the intermediate store for work, only do a bulk upload 
at the end.

> 
>   val spark = SparkSession.builder()
>                 .appName("MergeEntities")
>                 .config("spark.sql.warehouse.dir", 
> mergeConfig.getString("sparkSqlWarehouseDir"))
>                 .config("fs.s3a.buffer.dir", "/tmp")
>                 .config("spark.hadoop.mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
>                 .config("mapred.output.committer.class", 
> classOf[DirectOutputCommitter].getCanonicalName)
>                 .config("mapreduce.use.directfileoutputcommitter", "true")
>                 //.config("spark.sql.sources.outputCommitterClass", 
> classOf[DirectOutputCommitter].getCanonicalName)
>                 .getOrCreate()
> 
> Srikanth

Reply via email to