Yes it does. :-) I have implemented it with Hadoop1 and Hadoop2.
Essentially I have extended the HadoopOutputFormat reusing part of the code
of the HadoopOutputFormatBase, and set the MongoOutputCommiter to replace
the FileOutputCommitter.

saluti,
Stefano



Stefano Bortoli, PhD

*ENS Technical Director *_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Email:* bort...@okkam.it

*Phone nr: +39 0461 1823912 *

*Headquarters:* Trento (Italy), Via Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

2015-07-23 13:31 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Does this make the MongoHadoopOutputFormat work for you?
>
> On Thu, Jul 23, 2015 at 12:44 PM, Stefano Bortoli <s.bort...@gmail.com>
> wrote:
>
>> https://issues.apache.org/jira/browse/FLINK-2394?filter=-2
>>
>> Meanwhile, I have implemented the MongoHadoopOutputFormat overriding
>> open, close and globalFinalize methods.
>>
>> saluti,
>> Stefano
>>
>> 2015-07-22 17:11 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>
>>> Thank's for reporting this, Stefano!
>>>
>>> Seems like the HadoopOutputFormat wrapper is pretty much specialized on
>>> File Output Formats.
>>>
>>> Can you open an issue for that? Someone will need to look into this...
>>>
>>> On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <s.bort...@gmail.com>
>>> wrote:
>>>
>>>> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
>>>> returns false on if
>>>> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
>>>>
>>>> i    /**
>>>>      * commit the task by moving the output file out from the temporary
>>>> directory.
>>>>      * @throws java.io.IOException
>>>>      */
>>>>     @Override
>>>>     public void close() throws IOException {
>>>>         this.recordWriter.close(new HadoopDummyReporter());
>>>>
>>>>         if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
>>>>             this.fileOutputCommitter.commitTask(this.context);
>>>>         }
>>>>     }
>>>>
>>>>
>>>> Also, both the close and the finalize global use a FileOutputCommitter,
>>>> and never the MongoOutputCommitter
>>>>
>>>> @Override
>>>>     public void finalizeGlobal(int parallelism) throws IOException {
>>>>
>>>>         try {
>>>>             JobContext jobContext =
>>>> HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
>>>>             FileOutputCommitter fileOutputCommitter = new
>>>> FileOutputCommitter();
>>>>
>>>>             // finalize HDFS output format
>>>>             fileOutputCommitter.commitJob(jobContext);
>>>>         } catch (Exception e) {
>>>>             throw new RuntimeException(e);
>>>>         }
>>>>     }
>>>>
>>>> anyone can have a look into that?
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli <bort...@okkam.it>:
>>>>
>>>>> Debugging, it seem the commitTask method of the MongoOutputCommitter
>>>>> is never called. Is it possible that this 'bulk' approach of mongo-hadoop
>>>>> 1.4 does not fit the task execution method of Flink?
>>>>>
>>>>> any idea? thanks a lot in advance.
>>>>>
>>>>> saluti,
>>>>> Stefano
>>>>>
>>>>> Stefano Bortoli, PhD
>>>>>
>>>>> *ENS Technical Director *
>>>>> _______________________________________________
>>>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>>>>
>>>>> *Email:* bort...@okkam.it
>>>>>
>>>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>>>>
>>>>> *Headquarters:* Trento (Italy), Via Trener 8
>>>>> *Registered office:* Trento (Italy), via Segantini 23
>>>>>
>>>>> Confidentially notice. This e-mail transmission may contain legally
>>>>> privileged and/or confidential information. Please do not read it if you
>>>>> are not the intended recipient(S). Any use, distribution, reproduction or
>>>>> disclosure by any other person is strictly prohibited. If you have 
>>>>> received
>>>>> this e-mail in error, please notify the sender and destroy the original
>>>>> transmission and its attachments without reading or saving it in any 
>>>>> manner.
>>>>>
>>>>> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to analyze and update a MongoDB collection with Apache
>>>>>> Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>>>>>
>>>>>> The process is fairly simple, and the MongoInputFormat works
>>>>>> smoothly, however it does not write back to the collection. The process
>>>>>> works, because the writeAsText works as expected. I am quite puzzled
>>>>>> because debugging I can see it writes in some temporary directory.
>>>>>>
>>>>>> The mapred.output.uri seem to serve just to output a file named
>>>>>> _SUCCESS, and if I do not set it fails with
>>>>>> java.lang.IllegalArgumentException: Can not create a Path from a null
>>>>>> string
>>>>>>     at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>>>>>>     at org.apache.hadoop.fs.Path.<init>(Path.java:135)
>>>>>>     at
>>>>>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> Anyone experienced something similar? any hints where to look at?
>>>>>> Thanks a lot in advance!
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>> ====================================================
>>>>>> Configuration conf = new Configuration();
>>>>>> conf.set("mapred.output.dir", "/tmp/");
>>>>>>         conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
>>>>>>                 collectionsUri);
>>>>>>         conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
>>>>>>                 collectionsUri);
>>>>>>
>>>>>>         Job job = Job.getInstance(conf);
>>>>>>
>>>>>>         // create a MongodbInputFormat, using a Hadoop input format
>>>>>> wrapper
>>>>>>         InputFormat<Object, BSONObject>  mapreduceInputFormat =  new
>>>>>> MyMongoInputFormat<Object, BSONObject>();
>>>>>>         HadoopInputFormat<Object, BSONObject> hdIf = new
>>>>>> HadoopInputFormat<Object, BSONObject>(
>>>>>>                 mapreduceInputFormat, Object.class, BSONObject.class,
>>>>>>                 job);
>>>>>> DataSet<Tuple2<Text, BSONWritable>> fin = input
>>>>>>                 .flatMap(new myFlatMapFunction()).setParallelism(16);
>>>>>>
>>>>>>         MongoConfigUtil.setOutputURI(job.getConfiguration(),
>>>>>> collectionsUri);
>>>>>>
>>>>>>         fin.output(new HadoopOutputFormat<Text, BSONWritable>(
>>>>>>                 new MongoOutputFormat<Text, BSONWritable>(),
>>>>>>                 job));
>>>>>> //        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to