The last week I've been able to run the job several times without any error. then I just recompiled it and the error reappered :( This time I had:
java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at main(DataInference.java:372)) -> Map (Map at writeEntitonPojos(ParquetThriftEntitons.java:170))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:98) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer. at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) ... 5 more I can't really find a way to understand what is causing the error :( On Mon, May 30, 2016 at 12:25 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Flavio, > > can you privately share the source code of your Flink job with me? > > I'm wondering whether the issue might be caused by a version mixup between > different versions on the cluster (different JVM versions? or different > files in the lib/ folder?), How are you deploying the Flink job? > > Regards, > Robert > > > On Mon, May 30, 2016 at 11:33 AM, Flavio Pompermaier <pomperma...@okkam.it > > wrote: > >> I tried to reproduce the error on a subset of the data and actually >> reducing the available memory and increasing a lot the gc (creating a lot >> of useless objects in one of the first UDFs) caused this error: >> >> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >> terminated due to an exception: / by zero >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >> Caused by: java.lang.ArithmeticException: / by zero >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.getSegmentsForReaders(UnilateralSortMerger.java:1651) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.mergeChannelList(UnilateralSortMerger.java:1565) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1417) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> >> I hope this could help to restrict the debugging area :) >> >> Best, >> Flavio >> >> On Fri, May 27, 2016 at 8:21 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi! >>> >>> That is a pretty thing indeed :-) Will try to look into this in a few >>> days... >>> >>> Stephan >>> >>> >>> On Fri, May 27, 2016 at 12:10 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> Running the job with log level set to DEBUG made the job run >>>> successfully...Is this meaningful..? Maybe slowing down a little bit the >>>> threads could help serialization? >>>> >>>> >>>> On Thu, May 26, 2016 at 12:34 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Still not able to reproduce the error locally but remotly :) >>>>> Any suggestions about how to try to reproduce it locally on a subset >>>>> of the data? >>>>> This time I had: >>>>> >>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>> at >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>>> at >>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>> at >>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>> at >>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> >>>>> On Tue, May 24, 2016 at 5:47 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Do you have any suggestion about how to reproduce the error on a >>>>>> subset of the data? >>>>>> I'm trying changing the following but I can't find a configuration >>>>>> causing the error :( >>>>>> >>>>>> rivate static ExecutionEnvironment getLocalExecutionEnv() { >>>>>> org.apache.flink.configuration.Configuration c = new >>>>>> org.apache.flink.configuration.Configuration(); >>>>>> c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, "/tmp"); >>>>>> >>>>>> c.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY,"/tmp"); >>>>>> c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >>>>>> 0.9f); >>>>>> c.setLong(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); >>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); >>>>>> c.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10000 s"); >>>>>> >>>>>> c.setLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048 * >>>>>> 12); >>>>>> ExecutionEnvironment env = >>>>>> ExecutionEnvironment.createLocalEnvironment(c); >>>>>> env.setParallelism(16); >>>>>> env.registerTypeWithKryoSerializer(DateTime.class, >>>>>> JodaDateTimeSerializer.class ); >>>>>> return env; >>>>>> } >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>>> >>>>>> On Tue, May 24, 2016 at 11:13 AM, Till Rohrmann <trohrm...@apache.org >>>>>> > wrote: >>>>>> >>>>>>> The error look really strange. Flavio, could you compile a test >>>>>>> program with example data and configuration to reproduce the problem. >>>>>>> Given >>>>>>> that, we could try to debug the problem. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >