Thanks for letting us know! The problem with Java Serialization is that they often swallow exceptions and you only see a "corrupted byte stream" in the end. So far, I have found no workaround for that.
Stephan On Fri, Jul 24, 2015 at 11:31 AM, Stefano Bortoli <s.bort...@gmail.com> wrote: > It seems there is a problem with the maven class loading. I have created > the uberjar and then executed with traditional java -cp uberjar.jar args > and it worked with no problems. It could be interesting to investigate the > reason, as maven exec is very convenient. However, with the uberjar the > problems of classpath are eased, so I can live with it. > > thanks a lot for your support. > > saluti, > Stefano > > 2015-07-24 11:17 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: > >> HI Stephan, >> >> I think I may have found a possible root of the problem. I do not build >> the fat jar, I simply execute the main with maven exec:java with default >> install and compile. No uberjar created shading. I will try that and >> report. The fact that it runs in eclipse so easily makes it confusing >> somehow. >> >> saluti, >> Stefano >> >> 2015-07-24 11:09 GMT+02:00 Stephan Ewen <se...@apache.org>: >> >>> Hi! >>> >>> There is probably something going wrong in MongoOutputFormat or >>> MongoHadoop2OutputFormat. >>> Something fails, but Java swallows the problem during Serialization. >>> >>> It may be a classloading issue that gets not reported. Are the >>> MongoOutputFormat and the MongoHadoop2OutputFormat both in the fat jar? >>> If not, try putting them in there. >>> >>> The last check we could to (to validate the Flink Serialization >>> utilities) is the code pasted below. If that does not cause the error, it >>> is probably the issue described above. >>> >>> Greetings, >>> Stephan >>> >>> >>> ------------------------------ >>> >>> UserCodeObjectWrapper<Object> userCode = new >>> UserCodeObjectWrapper<Object>(new MongoHadoop2OutputFormat<>(new >>> MongoOutputFormat<>(), Job.getInstance())); >>> Configuration cfg = new Configuration(); >>> TaskConfig taskConfig = new TaskConfig(cfg); >>> taskConfig.setStubWrapper(userCode); >>> taskConfig.getStubWrapper(ClassLoader.getSystemClassLoader()); >>> >>> >>> >>> On Fri, Jul 24, 2015 at 10:44 AM, Stefano Bortoli <s.bort...@gmail.com> >>> wrote: >>> >>>> I have implemented this test without any exception: >>>> >>>> package org.tagcloud.persistence.batch.test; >>>> >>>> import java.io.IOException; >>>> >>>> import org.apache.commons.lang.SerializationUtils; >>>> import org.apache.hadoop.mapreduce.Job; >>>> import org.tagcloud.persistence.batch.MongoHadoop2OutputFormat; >>>> >>>> import com.mongodb.hadoop.MongoOutputFormat; >>>> >>>> public class MongoHadoopSerializationTest { >>>> >>>> public static void main(String[] args) { >>>> Job job; >>>> try { >>>> job = Job.getInstance(); >>>> SerializationUtils.clone(new MongoHadoop2OutputFormat<>(new >>>> MongoOutputFormat<>(), job)); >>>> } catch (IOException e) { >>>> e.printStackTrace(); >>>> } >>>> >>>> } >>>> >>>> } >>>> >>>> 2015-07-24 10:01 GMT+02:00 Stephan Ewen <se...@apache.org>: >>>> >>>>> Hi! >>>>> >>>>> The user code object (the output format here) has a corrupt >>>>> serialization routine. >>>>> >>>>> We use default Java Serialization for these objects. Either the >>>>> MongoHadoopOutputFormat >>>>> cannot be serialized and swallows an exception, or it overrides the >>>>> readObject() / writeObject() methods (from Java Serialization) in an >>>>> inconsistent way. >>>>> >>>>> To figure that out, can you try whether you can manually serialize the >>>>> MongoHadoopOutputFormat? >>>>> >>>>> Can you try and call "SerializationUtils.clone(new >>>>> MongoHadoopOutputFormat)", for example at the beginning of your main >>>>> method? The SerializationUtils are part of Apache Commons and are probably >>>>> in your class path anyways. >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Fri, Jul 24, 2015 at 9:51 AM, Stefano Bortoli <bort...@okkam.it> >>>>> wrote: >>>>> >>>>>> Hi guys! >>>>>> >>>>>> I could program a data maintenance job using Flink on MongoDB. The >>>>>> job runs smoothly if I start it from eclipse. However, when I try to run >>>>>> it >>>>>> using a bash script invoking a maven exec:java I have a serialization >>>>>> exception: >>>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot >>>>>> initialize task 'DataSink >>>>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891)': >>>>>> Deserializing the OutputFormat >>>>>> (org.tagcloud.persistence.batch.MongoHadoopOutputFormat@19b88891) >>>>>> failed: Could not read the user code wrapper: unexpected block data >>>>>> at >>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) >>>>>> at >>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) >>>>>> >>>>>> attached the complete stack trace. I thought it was a matter of >>>>>> serializable classes, so I have made all my classes serializable.. still >>>>>> I >>>>>> have the same error. Perhaps it is not possible to do these things with >>>>>> Flink. >>>>>> >>>>>> any intuition? is it doable? >>>>>> >>>>>> thanks a lot for your support. :-) >>>>>> >>>>>> saluti, >>>>>> >>>>>> Stefano Bortoli, PhD >>>>>> >>>>>> *ENS Technical Director * >>>>>> _______________________________________________ >>>>>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* >>>>>> >>>>>> *Email:* bort...@okkam.it >>>>>> >>>>>> *Phone nr: +39 0461 1823912 <%2B39%200461%201823912> * >>>>>> >>>>>> *Headquarters:* Trento (Italy), Via Trener 8 >>>>>> *Registered office:* Trento (Italy), via Segantini 23 >>>>>> >>>>>> Confidentially notice. This e-mail transmission may contain legally >>>>>> privileged and/or confidential information. Please do not read it if you >>>>>> are not the intended recipient(S). Any use, distribution, reproduction or >>>>>> disclosure by any other person is strictly prohibited. If you have >>>>>> received >>>>>> this e-mail in error, please notify the sender and destroy the original >>>>>> transmission and its attachments without reading or saving it in any >>>>>> manner. >>>>>> >>>>>> >>>>> >>>> >>> >> >