Unless someone really invests time into debugging this I fear that the different misspellings are not really helpful, Flavio.
On Mon, Jun 6, 2016 at 10:31 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > This time I had the following exception (obviously > it.okkam.flinj.model.pojo.TipoSoggetto should be > it.okkam.flink.model.pojo.TipoSoggetto). > > java.lang.RuntimeException: Cannot instantiate class. > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassNotFoundException: > it.okkam.flinj.model.pojo.TipoSoggetto > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) > ... 10 more > > > > On Wed, Jun 1, 2016 at 5:44 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: >> >> The last week I've been able to run the job several times without any >> error. then I just recompiled it and the error reappered :( >> This time I had: >> >> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup >> at main(DataInference.java:372)) -> Map (Map at >> writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error >> obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated >> due to an exception: Serializer consumed more bytes than the record had. >> This indicates broken serialization. If you are using custom serialization >> types (Value or Writable), check their serialization methods. If you are >> using a Kryo-serialized type, check the corresponding Kryo serializer. >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger Reading Thread' terminated due to an exception: >> Serializer consumed more bytes than the record had. This indicates broken >> serialization. If you are using custom serialization types (Value or >> Writable), check their serialization methods. If you are using a >> Kryo-serialized type, check the corresponding Kryo serializer. >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> at >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >> at >> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >> terminated due to an exception: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >> Caused by: java.io.IOException: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >> at >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> at >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >> at java.util.ArrayList.elementData(ArrayList.java:418) >> at java.util.ArrayList.get(ArrayList.java:431) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> ... 5 more >> >> >> I can't really find a way to understand what is causing the error :( >> >> >> On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <rmetz...@apache.org> >> wrote: >>> >>> Hi Flavio, >>> >>> can you privately share the source code of your Flink job with me? >>> >>> I'm wondering whether the issue might be caused by a version mixup >>> between different versions on the cluster (different JVM versions? or >>> different files in the lib/ folder?), How are you deploying the Flink job? >>> >>> Regards, >>> Robert >>> >>> >>> On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier >>> <pomperma...@okkam.it> wrote: >>>> >>>> I tried to reproduce the error on a subset of the data and actually >>>> reducing the available memory and increasing a lot the gc (creating a lot >>>> of >>>> useless objects in one of the first UDFs) caused this error: >>>> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>> terminated due to an exception: / by zero >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>> Caused by: java.lang.ArithmeticException: / by zero >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651) >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565) >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417) >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>> >>>> I hope this could help to restrict the debugging area :) >>>> >>>> Best, >>>> Flavio >>>> >>>> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>> Hi! >>>>> >>>>> That is a pretty thing indeed :-) Will try to look into this in a few >>>>> days... >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier >>>>> <pomperma...@okkam.it> wrote: >>>>>> >>>>>> Running the job with log level set to DEBUG made the job run >>>>>> successfully...Is this meaningful..? Maybe slowing down a little bit the >>>>>> threads could help serialization? >>>>>> >>>>>> >>>>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier >>>>>> <pomperma...@okkam.it> wrote: >>>>>>> >>>>>>> Still not able to reproduce the error locally but remotly :) >>>>>>> Any suggestions about how to try to reproduce it locally on a subset >>>>>>> of the data? >>>>>>> This time I had: >>>>>>> >>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>>>> at >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>>>>> at >>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>>> at >>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>> >>>>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier >>>>>>> <pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>> Do you have any suggestion about how to reproduce the error on a >>>>>>>> subset of the data? >>>>>>>> I'm trying changing the following but I can't find a configuration >>>>>>>> causing the error :( >>>>>>>> >>>>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() { >>>>>>>> org.apache.flink.configuration.Configuration c = new >>>>>>>> org.apache.flink.configuration.Configuration(); >>>>>>>> c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, >>>>>>>> "/tmp"); >>>>>>>> >>>>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp"); >>>>>>>> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >>>>>>>> 0.9f); >>>>>>>> c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); >>>>>>>> c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s"); >>>>>>>> >>>>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * >>>>>>>> 12); >>>>>>>> ExecutionEnvironment env = >>>>>>>> ExecutionEnvironment.createLocalEnvironment(c); >>>>>>>> env.setParallelism(16); >>>>>>>> env.registerTypeWithKryoSerializer(DateTime.class, >>>>>>>> JodaDateTimeSerializer.class ); >>>>>>>> return env; >>>>>>>> } >>>>>>>> >>>>>>>> Best, >>>>>>>> Flavio >>>>>>>> >>>>>>>> >>>>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann >>>>>>>> <trohrm...@apache.org> wrote: >>>>>>>>> >>>>>>>>> The error look really strange. Flavio, could you compile a test >>>>>>>>> program with example data and configuration to reproduce the problem. >>>>>>>>> Given >>>>>>>>> that, we could try to debug the problem. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >