https://issues.apache.org/jira/browse/FLINK-2394?filter=-2
Meanwhile, I have implemented the MongoHadoopOutputFormat overriding open, close and globalFinalize methods. saluti, Stefano 2015-07-22 17:11 GMT+02:00 Stephan Ewen <se...@apache.org>: > Thank's for reporting this, Stefano! > > Seems like the HadoopOutputFormat wrapper is pretty much specialized on > File Output Formats. > > Can you open an issue for that? Someone will need to look into this... > > On Wed, Jul 22, 2015 at 4:48 PM, Stefano Bortoli <s.bort...@gmail.com> > wrote: > >> In fact, on close() of the HadoopOutputFormat the fileOutputCommitter >> returns false on if >> (this.fileOutputCommitter.needsTaskCommit(this.context)) returns false. >> >> i /** >> * commit the task by moving the output file out from the temporary >> directory. >> * @throws java.io.IOException >> */ >> @Override >> public void close() throws IOException { >> this.recordWriter.close(new HadoopDummyReporter()); >> >> if (this.fileOutputCommitter.needsTaskCommit(this.context)) { >> this.fileOutputCommitter.commitTask(this.context); >> } >> } >> >> >> Also, both the close and the finalize global use a FileOutputCommitter, >> and never the MongoOutputCommitter >> >> @Override >> public void finalizeGlobal(int parallelism) throws IOException { >> >> try { >> JobContext jobContext = >> HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); >> FileOutputCommitter fileOutputCommitter = new >> FileOutputCommitter(); >> >> // finalize HDFS output format >> fileOutputCommitter.commitJob(jobContext); >> } catch (Exception e) { >> throw new RuntimeException(e); >> } >> } >> >> anyone can have a look into that? >> >> saluti, >> Stefano >> >> 2015-07-22 15:53 GMT+02:00 Stefano Bortoli <bort...@okkam.it>: >> >>> Debugging, it seem the commitTask method of the MongoOutputCommitter is >>> never called. Is it possible that this 'bulk' approach of mongo-hadoop 1.4 >>> does not fit the task execution method of Flink? >>> >>> any idea? thanks a lot in advance. >>> >>> saluti, >>> Stefano >>> >>> Stefano Bortoli, PhD >>> >>> *ENS Technical Director *_______________________________________________ >>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* >>> >>> *Email:* bort...@okkam.it >>> >>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> * >>> >>> *Headquarters:* Trento (Italy), Via Trener 8 >>> *Registered office:* Trento (Italy), via Segantini 23 >>> >>> Confidentially notice. This e-mail transmission may contain legally >>> privileged and/or confidential information. Please do not read it if you >>> are not the intended recipient(S). Any use, distribution, reproduction or >>> disclosure by any other person is strictly prohibited. If you have received >>> this e-mail in error, please notify the sender and destroy the original >>> transmission and its attachments without reading or saving it in any manner. >>> >>> 2015-07-22 14:26 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: >>> >>>> Hi, >>>> >>>> I am trying to analyze and update a MongoDB collection with Apache >>>> Flink 0.9.0 and Mongo Hadoop 1.4.0 Hadoop 2.6.0. >>>> >>>> The process is fairly simple, and the MongoInputFormat works smoothly, >>>> however it does not write back to the collection. The process works, >>>> because the writeAsText works as expected. I am quite puzzled because >>>> debugging I can see it writes in some temporary directory. >>>> >>>> The mapred.output.uri seem to serve just to output a file named >>>> _SUCCESS, and if I do not set it fails with >>>> java.lang.IllegalArgumentException: Can not create a Path from a null >>>> string >>>> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) >>>> at org.apache.hadoop.fs.Path.<init>(Path.java:135) >>>> at >>>> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.open(HadoopOutputFormatBase.java:108) >>>> at >>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:186) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Anyone experienced something similar? any hints where to look at? >>>> Thanks a lot in advance! >>>> >>>> saluti, >>>> Stefano >>>> >>>> ==================================================== >>>> Configuration conf = new Configuration(); >>>> conf.set("mapred.output.dir", "/tmp/"); >>>> conf.set(MongoUtil.MONGO_INPUT_URI_PROPERTY, >>>> collectionsUri); >>>> conf.set(MongoUtil.MONGO_OUTPUT_URI_PROPERTY, >>>> collectionsUri); >>>> >>>> Job job = Job.getInstance(conf); >>>> >>>> // create a MongodbInputFormat, using a Hadoop input format >>>> wrapper >>>> InputFormat<Object, BSONObject> mapreduceInputFormat = new >>>> MyMongoInputFormat<Object, BSONObject>(); >>>> HadoopInputFormat<Object, BSONObject> hdIf = new >>>> HadoopInputFormat<Object, BSONObject>( >>>> mapreduceInputFormat, Object.class, BSONObject.class, >>>> job); >>>> DataSet<Tuple2<Text, BSONWritable>> fin = input >>>> .flatMap(new myFlatMapFunction()).setParallelism(16); >>>> >>>> MongoConfigUtil.setOutputURI(job.getConfiguration(), >>>> collectionsUri); >>>> >>>> fin.output(new HadoopOutputFormat<Text, BSONWritable>( >>>> new MongoOutputFormat<Text, BSONWritable>(), >>>> job)); >>>> // fin.writeAsText("/tmp/out", WriteMode.OVERWRITE); >>>> >>>> >>> >> >