Does this make the MongoHadoopOutputFormat work for you? On Thu, Jul 23, 2015 at 12:44 PM, Stefano Bortoli <s.bort...@gmail.com> wrote:
> https://issues.apache.org/jira/browse/FLINK-2394?filter=-2 > > Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open, > close and globalFinalize methods. > > saluti, > Stefano > > 2015-07-22 17:11 GMT+02:00 Stephan Ewen <se...@apache.org>: > >> Thank's for reporting this, Stefano! >> >> Seems like the HadoopOutputFormat wrapper is pretty much specialized on >> File Output Formats. >> >> Can you open an issue for that? Someone will need to look into this... >> >> On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <s.bort...@gmail.com> >> wrote: >> >>> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter >>> returns false on if >>> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false. >>> >>> i /** >>> * commit the task by moving the output file out from the temporary >>> directory. >>> * @throws java.io.IOException >>> */ >>> @Override >>> public void close() throws IOException { >>> this.recordWriter.close(new HadoopDummyReporter()); >>> >>> if (this.fileOutputCommitter.needsTaskCommit(this.context)) { >>> this.fileOutputCommitter.commitTask(this.context); >>> } >>> } >>> >>> >>> Also, both the close and the finalize global use a FileOutputCommitter, >>> and never the MongoOutputCommitter >>> >>> @Override >>> public void finalizeGlobal(int parallelism) throws IOException { >>> >>> try { >>> JobContext jobContext = >>> HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); >>> FileOutputCommitter fileOutputCommitter = new >>> FileOutputCommitter(); >>> >>> // finalize HDFS output format >>> fileOutputCommitter.commitJob(jobContext); >>> } catch (Exception e) { >>> throw new RuntimeException(e); >>> } >>> } >>> >>> anyone can have a look into that? >>> >>> saluti, >>> Stefano >>> >>> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli <bort...@okkam.it>: >>> >>>> Debugging, it seem the commitTask method of the MongoOutputCommitter is >>>> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 >>>> does not fit the task execution method of Flink? >>>> >>>> any idea? thanks a lot in advance. >>>> >>>> saluti, >>>> Stefano >>>> >>>> Stefano Bortoli, PhD >>>> >>>> *ENS Technical Director * >>>> _______________________________________________ >>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* >>>> >>>> *Email:* bort...@okkam.it >>>> >>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> * >>>> >>>> *Headquarters:* Trento (Italy), Via Trener 8 >>>> *Registered office:* Trento (Italy), via Segantini 23 >>>> >>>> Confidentially notice. This e-mail transmission may contain legally >>>> privileged and/or confidential information. Please do not read it if you >>>> are not the intended recipient(S). Any use, distribution, reproduction or >>>> disclosure by any other person is strictly prohibited. If you have received >>>> this e-mail in error, please notify the sender and destroy the original >>>> transmission and its attachments without reading or saving it in any >>>> manner. >>>> >>>> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: >>>> >>>>> Hi, >>>>> >>>>> I am trying to analyze and update a MongoDB collection with Apache >>>>> Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0. >>>>> >>>>> The process is fairly simple, and the MongoInputFormat works smoothly, >>>>> however it does not write back to the collection. The process works, >>>>> because the writeAsText works as expected. I am quite puzzled because >>>>> debugging I can see it writes in some temporary directory. >>>>> >>>>> The mapred.output.uri seem to serve just to output a file named >>>>> _SUCCESS, and if I do not set it fails with >>>>> java.lang.IllegalArgumentException: Can not create a Path from a null >>>>> string >>>>> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) >>>>> at org.apache.hadoop.fs.Path.<init>(Path.java:135) >>>>> at >>>>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108) >>>>> at >>>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Anyone experienced something similar? any hints where to look at? >>>>> Thanks a lot in advance! >>>>> >>>>> saluti, >>>>> Stefano >>>>> >>>>> ==================================================== >>>>> Configuration conf = new Configuration(); >>>>> conf.set("mapred.output.dir", "/tmp/"); >>>>> conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY, >>>>> collectionsUri); >>>>> conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY, >>>>> collectionsUri); >>>>> >>>>> Job job = Job.getInstance(conf); >>>>> >>>>> // create a MongodbInputFormat, using a Hadoop input format >>>>> wrapper >>>>> InputFormat<Object, BSONObject> mapreduceInputFormat = new >>>>> MyMongoInputFormat<Object, BSONObject>(); >>>>> HadoopInputFormat<Object, BSONObject> hdIf = new >>>>> HadoopInputFormat<Object, BSONObject>( >>>>> mapreduceInputFormat, Object.class, BSONObject.class, >>>>> job); >>>>> DataSet<Tuple2<Text, BSONWritable>> fin = input >>>>> .flatMap(new myFlatMapFunction()).setParallelism(16); >>>>> >>>>> MongoConfigUtil.setOutputURI(job.getConfiguration(), >>>>> collectionsUri); >>>>> >>>>> fin.output(new HadoopOutputFormat<Text, BSONWritable>( >>>>> new MongoOutputFormat<Text, BSONWritable>(), >>>>> job)); >>>>> // fin.writeAsText("/tmp/out", WriteMode.OVERWRITE); >>>>> >>>>> >>>> >>> >> >