Yes it does. :-) I have implemented it with Hadoop1 and Hadoop2.
Essentially I have extended the HadoopOutputFormat reusing part of the code
of the HadoopOutputFormatBase, and set the MongoOutputCommiter to replace
the FileOutputCommitter.


>> Meanwhile, I have implemented the MongoHadoopOutputFormat overriding
>> open, close and globalFinalize methods.
>>> Thank's for reporting this, Stefano!
>>> Seems like the HadoopOutputFormat wrapper is pretty much specialized on
>>> File Output Formats.
>>> Can you open an issue for that? Someone will need to look into this...
>>>> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
>>>> returns false on if
>>>> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
>>>> i    /**
>>>>      * commit the task by moving the output file out from the temporary
>>>> directory.
>>>>      * @throws
>>>>      */
>>>>     @Override
>>>>     public void close() throws IOException {
>>>>         this.recordWriter.close(new HadoopDummyReporter());
>>>>         if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
>>>>             this.fileOutputCommitter.commitTask(this.context);
>>>>         }
>>>>     }
>>>> Also, both the close and the finalize global use a FileOutputCommitter,
>>>> and never the MongoOutputCommitter
>>>> @Override
>>>>     public void finalizeGlobal(int parallelism) throws IOException {
>>>>         try {
>>>>             JobContext jobContext =
>>>> HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
>>>>             FileOutputCommitter fileOutputCommitter = new
>>>> FileOutputCommitter();
>>>>             // finalize HDFS output format
>>>>             fileOutputCommitter.commitJob(jobContext);
>>>>         } catch (Exception e) {
>>>>             throw new RuntimeException(e);
>>>>         }
>>>>     }
>>>> anyone can have a look into that?
>>>>> Debugging, it seem the commitTask method of the MongoOutputCommitter
>>>>> is never called. Is it possible that this 'bulk' approach of mongo-hadoop
>>>>> 1.4 does not fit the task execution method of Flink?
>>>>> any idea? thanks a lot in advance.
>>>>>> Hi,
>>>>>> I am trying to analyze and update a MongoDB collection with Apache
>>>>>> Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>>>>> The process is fairly simple, and the MongoInputFormat works
>>>>>> smoothly, however it does not write back to the collection. The process
>>>>>> works, because the writeAsText works as expected. I am quite puzzled
>>>>>> because debugging I can see it writes in some temporary directory.
>>>>>> The mapred.output.uri seem to serve just to output a file named
>>>>>> _SUCCESS, and if I do not set it fails with
>>>>>> java.lang.IllegalArgumentException: Can not create a Path from a null
>>>>>> string
>>>>>>     at org.apache.hadoop.fs.Path.checkPathArg(
>>>>>>     at org.apache.hadoop.fs.Path.<init>(
>>>>>>     at
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(
>>>>>>     at
>>>>>>     at
>>>>>> Anyone experienced something similar? any hints where to look at?
>>>>>> Thanks a lot in advance!
>>>>>> ====================================================
>>>>>> Configuration conf = new Configuration();
>>>>>> conf.set("mapred.output.dir", "/tmp/");
>>>>>>         conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
>>>>>>                 collectionsUri);
>>>>>>         conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
>>>>>>                 collectionsUri);
>>>>>>         Job job = Job.getInstance(conf);
>>>>>>         // create a MongodbInputFormat, using a Hadoop input format
>>>>>> wrapper
>>>>>>         InputFormat<Object, BSONObject>  mapreduceInputFormat =  new
>>>>>> MyMongoInputFormat<Object, BSONObject>();
>>>>>>         HadoopInputFormat<Object, BSONObject> hdIf = new
>>>>>> HadoopInputFormat<Object, BSONObject>(
>>>>>>                 mapreduceInputFormat, Object.class, BSONObject.class,
>>>>>>                 job);
>>>>>> DataSet<Tuple2<Text, BSONWritable>> fin = input
>>>>>>                 .flatMap(new myFlatMapFunction()).setParallelism(16);
>>>>>>         MongoConfigUtil.setOutputURI(job.getConfiguration(),
>>>>>> collectionsUri);
>>>>>>         fin.output(new HadoopOutputFormat<Text, BSONWritable>(
>>>>>>                 new MongoOutputFormat<Text, BSONWritable>(),
>>>>>>                 job));
>>>>>> //        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);

