https://issues.apache.org/jira/browse/FLINK-2394?filter=-2

Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open,
close and globalFinalize methods.

saluti,
Stefano

2015-07-22 17:11 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Thank's for reporting this, Stefano!
>
> Seems like the HadoopOutputFormat wrapper is pretty much specialized on
> File Output Formats.
>
> Can you open an issue for that? Someone will need to look into this...
>
> On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <s.bort...@gmail.com>
> wrote:
>
>> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter
>> returns false on if
>> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false.
>>
>> i    /**
>>      * commit the task by moving the output file out from the temporary
>> directory.
>>      * @throws java.io.IOException
>>      */
>>     @Override
>>     public void close() throws IOException {
>>         this.recordWriter.close(new HadoopDummyReporter());
>>
>>         if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
>>             this.fileOutputCommitter.commitTask(this.context);
>>         }
>>     }
>>
>>
>> Also, both the close and the finalize global use a FileOutputCommitter,
>> and never the MongoOutputCommitter
>>
>> @Override
>>     public void finalizeGlobal(int parallelism) throws IOException {
>>
>>         try {
>>             JobContext jobContext =
>> HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
>>             FileOutputCommitter fileOutputCommitter = new
>> FileOutputCommitter();
>>
>>             // finalize HDFS output format
>>             fileOutputCommitter.commitJob(jobContext);
>>         } catch (Exception e) {
>>             throw new RuntimeException(e);
>>         }
>>     }
>>
>> anyone can have a look into that?
>>
>> saluti,
>> Stefano
>>
>> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli <bort...@okkam.it>:
>>
>>> Debugging, it seem the commitTask method of the MongoOutputCommitter is
>>> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4
>>> does not fit the task execution method of Flink?
>>>
>>> any idea? thanks a lot in advance.
>>>
>>> saluti,
>>> Stefano
>>>
>>> Stefano Bortoli, PhD
>>>
>>> *ENS Technical Director *_______________________________________________
>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>>
>>> *Email:* bort...@okkam.it
>>>
>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> *
>>>
>>> *Headquarters:* Trento (Italy), Via Trener 8
>>> *Registered office:* Trento (Italy), via Segantini 23
>>>
>>> Confidentially notice. This e-mail transmission may contain legally
>>> privileged and/or confidential information. Please do not read it if you
>>> are not the intended recipient(S). Any use, distribution, reproduction or
>>> disclosure by any other person is strictly prohibited. If you have received
>>> this e-mail in error, please notify the sender and destroy the original
>>> transmission and its attachments without reading or saving it in any manner.
>>>
>>> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I am trying to analyze and update a MongoDB collection with Apache
>>>> Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0.
>>>>
>>>> The process is fairly simple, and the MongoInputFormat works smoothly,
>>>> however it does not write back to the collection. The process works,
>>>> because the writeAsText works as expected. I am quite puzzled because
>>>> debugging I can see it writes in some temporary directory.
>>>>
>>>> The mapred.output.uri seem to serve just to output a file named
>>>> _SUCCESS, and if I do not set it fails with
>>>> java.lang.IllegalArgumentException: Can not create a Path from a null
>>>> string
>>>>     at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>>>>     at org.apache.hadoop.fs.Path.<init>(Path.java:135)
>>>>     at
>>>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108)
>>>>     at
>>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Anyone experienced something similar? any hints where to look at?
>>>> Thanks a lot in advance!
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>> ====================================================
>>>> Configuration conf = new Configuration();
>>>> conf.set("mapred.output.dir", "/tmp/");
>>>>         conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY,
>>>>                 collectionsUri);
>>>>         conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY,
>>>>                 collectionsUri);
>>>>
>>>>         Job job = Job.getInstance(conf);
>>>>
>>>>         // create a MongodbInputFormat, using a Hadoop input format
>>>> wrapper
>>>>         InputFormat<Object, BSONObject>  mapreduceInputFormat =  new
>>>> MyMongoInputFormat<Object, BSONObject>();
>>>>         HadoopInputFormat<Object, BSONObject> hdIf = new
>>>> HadoopInputFormat<Object, BSONObject>(
>>>>                 mapreduceInputFormat, Object.class, BSONObject.class,
>>>>                 job);
>>>> DataSet<Tuple2<Text, BSONWritable>> fin = input
>>>>                 .flatMap(new myFlatMapFunction()).setParallelism(16);
>>>>
>>>>         MongoConfigUtil.setOutputURI(job.getConfiguration(),
>>>> collectionsUri);
>>>>
>>>>         fin.output(new HadoopOutputFormat<Text, BSONWritable>(
>>>>                 new MongoOutputFormat<Text, BSONWritable>(),
>>>>                 job));
>>>> //        fin.writeAsText("/tmp/out", WriteMode.OVERWRITE);
>>>>
>>>>
>>>
>>
>

Reply via email to