Does this make the MongoHadoopOutputFormat work for you?

> Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open,
> close and globalFinalize methods.
>> Thank's for reporting this, Stefano!
>> Seems like the HadoopOutputFormat wrapper is pretty much specialized on
>> File Output Formats.
>> Can you open an issue for that? Someone will need to look into this...
>>> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
>>> returns false on if
>>> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
>>> i    /**
>>>      * commit the task by moving the output file out from the temporary
>>> directory.
>>>      * @throws
>>>      */
>>>     @Override
>>>     public void close() throws IOException {
>>>         this.recordWriter.close(new HadoopDummyReporter());
>>>         if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
>>>             this.fileOutputCommitter.commitTask(this.context);
>>>         }
>>>     }
>>> Also, both the close and the finalize global use a FileOutputCommitter,
>>> and never the MongoOutputCommitter
>>> @Override
>>>     public void finalizeGlobal(int parallelism) throws IOException {
>>>         try {
>>>             JobContext jobContext =
>>> HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
>>>             FileOutputCommitter fileOutputCommitter = new
>>> FileOutputCommitter();
>>>             // finalize HDFS output format
>>>             fileOutputCommitter.commitJob(jobContext);
>>>         } catch (Exception e) {
>>>             throw new RuntimeException(e);
>>>         }
>>>     }
>>> anyone can have a look into that?
>>>> Debugging, it seem the commitTask method of the MongoOutputCommitter is
>>>> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
>>>> does not fit the task execution method of Flink?
>>>> any idea? thanks a lot in advance.
>>>>> Hi,
>>>>> I am trying to analyze and update a MongoDB collection with Apache
>>>>> Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>>>> The process is fairly simple, and the MongoInputFormat works smoothly,
>>>>> however it does not write back to the collection. The process works,
>>>>> because the writeAsText works as expected. I am quite puzzled because
>>>>> debugging I can see it writes in some temporary directory.
>>>>> The mapred.output.uri seem to serve just to output a file named
>>>>> _SUCCESS, and if I do not set it fails with
>>>>> java.lang.IllegalArgumentException: Can not create a Path from a null
>>>>> string
>>>>>     at org.apache.hadoop.fs.Path.checkPathArg(
>>>>>     at org.apache.hadoop.fs.Path.<init>(
>>>>>     at
>>>>>     at
>>>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(
>>>>>     at
>>>>>     at
>>>>> Anyone experienced something similar? any hints where to look at?
>>>>> Thanks a lot in advance!
>>>>> ====================================================
>>>>> Configuration conf = new Configuration();
>>>>> conf.set("mapred.output.dir", "/tmp/");
>>>>>         conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
>>>>>                 collectionsUri);
>>>>>         conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
>>>>>                 collectionsUri);
>>>>>         Job job = Job.getInstance(conf);
>>>>>         // create a MongodbInputFormat, using a Hadoop input format
>>>>> wrapper
>>>>>         InputFormat<Object, BSONObject>  mapreduceInputFormat =  new
>>>>> MyMongoInputFormat<Object, BSONObject>();
>>>>>         HadoopInputFormat<Object, BSONObject> hdIf = new
>>>>> HadoopInputFormat<Object, BSONObject>(
>>>>>                 mapreduceInputFormat, Object.class, BSONObject.class,
>>>>>                 job);
>>>>> DataSet<Tuple2<Text, BSONWritable>> fin = input
>>>>>                 .flatMap(new myFlatMapFunction()).setParallelism(16);
>>>>>         MongoConfigUtil.setOutputURI(job.getConfiguration(),
>>>>> collectionsUri);
>>>>>         fin.output(new HadoopOutputFormat<Text, BSONWritable>(
>>>>>                 new MongoOutputFormat<Text, BSONWritable>(),
>>>>>                 job));
>>>>> //        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);

