In fact, on close() of the HadoopOutputFormat the fileOutputCommitter returns false on if (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
i /** * commit the task by moving the output file out from the temporary directory. * @throws java.io.IOException */ @Override public void close() throws IOException { this.recordWriter.close(new HadoopDummyReporter()); if (this.fileOutputCommitter.needsTaskCommit(this.context)) { this.fileOutputCommitter.commitTask(this.context); } } Also, both the close and the finalize global use a FileOutputCommitter, and never the MongoOutputCommitter @Override public void finalizeGlobal(int parallelism) throws IOException { try { JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); FileOutputCommitter fileOutputCommitter = new FileOutputCommitter(); // finalize HDFS output format fileOutputCommitter.commitJob(jobContext); } catch (Exception e) { throw new RuntimeException(e); } } anyone can have a look into that? saluti, Stefano 2015-07-22 15:53 GMT+02:00 Stefano Bortoli <bort...@okkam.it>: > Debugging, it seem the commitTask method of the MongoOutputCommitter is > never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 > does not fit the task execution method of Flink? > > any idea? thanks a lot in advance. > > saluti, > Stefano > > Stefano Bortoli, PhD > > *ENS Technical Director *_______________________________________________ > *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* > > *Email:* bort...@okkam.it > > *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> * > > *Headquarters:* Trento (Italy), Via Trener 8 > *Registered office:* Trento (Italy), via Segantini 23 > > Confidentially notice. This e-mail transmission may contain legally > privileged and/or confidential information. Please do not read it if you > are not the intended recipient(S). Any use, distribution, reproduction or > disclosure by any other person is strictly prohibited. If you have received > this e-mail in error, please notify the sender and destroy the original > transmission and its attachments without reading or saving it in any manner. > > 2015-07-22 14:26 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: > >> Hi, >> >> I am trying to analyze and update a MongoDB collection with Apache Flink >> 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0. >> >> The process is fairly simple, and the MongoInputFormat works smoothly, >> however it does not write back to the collection. The process works, >> because the writeAsText works as expected. I am quite puzzled because >> debugging I can see it writes in some temporary directory. >> >> The mapred.output.uri seem to serve just to output a file named >> _SUCCESS, and if I do not set it fails with >> java.lang.IllegalArgumentException: Can not create a Path from a null >> string >> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) >> at org.apache.hadoop.fs.Path.<init>(Path.java:135) >> at >> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108) >> at >> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> Anyone experienced something similar? any hints where to look at? Thanks >> a lot in advance! >> >> saluti, >> Stefano >> >> ==================================================== >> Configuration conf = new Configuration(); >> conf.set("mapred.output.dir", "/tmp/"); >> conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY, >> collectionsUri); >> conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY, >> collectionsUri); >> >> Job job = Job.getInstance(conf); >> >> // create a MongodbInputFormat, using a Hadoop input format >> wrapper >> InputFormat<Object, BSONObject> mapreduceInputFormat = new >> MyMongoInputFormat<Object, BSONObject>(); >> HadoopInputFormat<Object, BSONObject> hdIf = new >> HadoopInputFormat<Object, BSONObject>( >> mapreduceInputFormat, Object.class, BSONObject.class, >> job); >> DataSet<Tuple2<Text, BSONWritable>> fin = input >> .flatMap(new myFlatMapFunction()).setParallelism(16); >> >> MongoConfigUtil.setOutputURI(job.getConfiguration(), >> collectionsUri); >> >> fin.output(new HadoopOutputFormat<Text, BSONWritable>( >> new MongoOutputFormat<Text, BSONWritable>(), >> job)); >> // fin.writeAsText("/tmp/out", WriteMode.OVERWRITE); >> >> >