HI Stephan, I think I may have found a possible root of the problem. I do not build the fat jar, I simply execute the main with maven exec:java with default install and compile. No uberjar created shading. I will try that and report. The fact that it runs in eclipse so easily makes it confusing somehow.
saluti, Stefano 2015-07-24 11:09 GMT+02:00 Stephan Ewen <se...@apache.org>: > Hi! > > There is probably something going wrong in MongoOutputFormat or > MongoHadoop2OutputFormat. > Something fails, but Java swallows the problem during Serialization. > > It may be a classloading issue that gets not reported. Are the > MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar? > If not, try putting them in there. > > The last check we could to (to validate the Flink Serialization utilities) > is the code pasted below. If that does not cause the error, it is probably > the issue described above. > > Greetings, > Stephan > > > ------------------------------ > > UserCodeObjectWrapper<Object> userCode = new > UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new > MongoOutputFormat<>(), Job.getInstance())); > Configuration cfg = new Configuration(); > TaskConfig taskConfig = new TaskConfig(cfg); > taskConfig.setStubWrapper(userCode); > taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader()); > > > > On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <s.bort...@gmail.com> > wrote: > >> I have implemented this test without any exception: >> >> package org.tagcloud.persistence.batch.test; >> >> import java.io.IOException; >> >> import org.apache.commons.lang.SerializationUtils; >> import org.apache.hadoop.mapreduce.Job; >> import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat; >> >> import com.mongodb.hadoop.MongoOutputFormat; >> >> public class MongoHadoopSerializationTest { >> >> public static void main(String[] args) { >> Job job; >> try { >> job = Job.getInstance(); >> SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new >> MongoOutputFormat<>(), job)); >> } catch (IOException e) { >> e.printStackTrace(); >> } >> >> } >> >> } >> >> 2015-07-24 10:01 GMT+02:00 Stephan Ewen <se...@apache.org>: >> >>> Hi! >>> >>> The user code object (the output format here) has a corrupt >>> serialization routine. >>> >>> We use default Java Serialization for these objects. Either the >>> MongoHadoopOutputFormat >>> cannot be serialized and swallows an exception, or it overrides the >>> readObject() / writeObject() methods (from Java Serialization) in an >>> inconsistent way. >>> >>> To figure that out, can you try whether you can manually serialize the >>> MongoHadoopOutputFormat? >>> >>> Can you try and call "SerializationUtils.clone(new >>> MongoHadoopOutputFormat)", for example at the beginning of your main >>> method? The SerializationUtils are part of Apache Commons and are probably >>> in your class path anyways. >>> >>> Stephan >>> >>> >>> On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <bort...@okkam.it> >>> wrote: >>> >>>> Hi guys! >>>> >>>> I could program a data maintenance job using Flink on MongoDB. The job >>>> runs smoothly if I start it from eclipse. However, when I try to run it >>>> using a bash script invoking a maven exec:java I have a serialization >>>> exception: >>>> org.apache.flink.runtime.client.JobExecutionException: Cannot >>>> initialize task 'DataSink >>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': >>>> Deserializing the OutputFormat >>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) >>>> failed: Could not read the user code wrapper: unexpected block data >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) >>>> >>>> attached the complete stack trace. I thought it was a matter of >>>> serializable classes, so I have made all my classes serializable.. still I >>>> have the same error. Perhaps it is not possible to do these things with >>>> Flink. >>>> >>>> any intuition? is it doable? >>>> >>>> thanks a lot for your support. :-) >>>> >>>> saluti, >>>> >>>> Stefano Bortoli, PhD >>>> >>>> *ENS Technical Director * >>>> _______________________________________________ >>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* >>>> >>>> *Email:* bort...@okkam.it >>>> >>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> * >>>> >>>> *Headquarters:* Trento (Italy), Via Trener 8 >>>> *Registered office:* Trento (Italy), via Segantini 23 >>>> >>>> Confidentially notice. This e-mail transmission may contain legally >>>> privileged and/or confidential information. Please do not read it if you >>>> are not the intended recipient(S). Any use, distribution, reproduction or >>>> disclosure by any other person is strictly prohibited. If you have received >>>> this e-mail in error, please notify the sender and destroy the original >>>> transmission and its attachments without reading or saving it in any >>>> manner. >>>> >>>> >>> >> >