Hi Ufuk, my records could be quite large Pojos (I think some MB). The only thing I do to configure Kryo is:
env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class ); Best, Flavio On Fri, May 20, 2016 at 12:38 PM, Ufuk Celebi <u...@apache.org> wrote: > @Stefano: the records are serialized anyway for batch jobs. The > spilling deserializer is only relevant if single records are very > large. How large are your records? In any case, I don't expect this to > be the problem. > > @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle and > ttil instead of util) look like some kind of data corruption and would > need further investigation. The other failure you reported might be > related to this. As a starting point, how do you configure the Kryo > serializer? > > On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier > <pomperma...@okkam.it> wrote: > > Today I've got this other strange error.. Obviously I don't have a > > VdhicleEvent class, but a VehicleEvent class :( > > > > java.lang.RuntimeException: Cannot instantiate class. > > at > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) > > at > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > > at > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > > at > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > > at > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > > at > > > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) > > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.ClassNotFoundException: > > it.okkam.flink.test.model.pojo.VdhicleEvent > > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:348) > > at > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) > > ... 10 more > > > > > > Thanks in advance, > > Flavio > > > > > > On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli <s.bort...@gmail.com> > > wrote: > >> > >> Hi Flavio, Till, > >> > >> do you think this can be possibly related to the serialization problem > >> caused by 'the management' of Kryo serializer buffer when spilling on > disk? > >> We are definitely going beyond what is managed in memory with this task. > >> > >> saluti, > >> Stefano > >> > >> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >>> > >>> That exception showed just once, but the following happens randomly > (if I > >>> re-run the job after stopping and restartign the cluster it doesn't > show up > >>> usually): > >>> > >>> Caused by: java.io.IOException: Serializer consumed more bytes than the > >>> record had. This indicates broken serialization. If you are using > custom > >>> serialization types (Value or Writable), check their serialization > methods. > >>> If you are using a Kryo-serialized type, check the corresponding Kryo > >>> serializer. > >>> at > >>> > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) > >>> at > >>> > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > >>> at > >>> > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > >>> at > >>> > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > >>> at > >>> > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) > >>> at > >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > >>> at > >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >>> at java.lang.Thread.run(Thread.java:745) > >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 > >>> at java.util.ArrayList.elementData(ArrayList.java:418) > >>> at java.util.ArrayList.get(ArrayList.java:431) > >>> at > >>> > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > >>> at > com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > >>> at > >>> > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) > >>> at > >>> > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) > >>> at > >>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) > >>> at > >>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > >>> at > >>> > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > >>> at > >>> > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > >>> > >>> > >>> On Sun, May 15, 2016 at 10:58 AM, Flavio Pompermaier > >>> <pomperma...@okkam.it> wrote: > >>>> > >>>> Hi to all, > >>>> in my last run of a job I received this weird Kryo Exception in one of > >>>> the TaskManager...obviously this class in not mentioned anywhere, > neither in > >>>> my project nor in flink... > >>>> Any help is appreciated! > >>>> > >>>> Best, > >>>> Flavio > >>>> > >>>> INFO org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce > >>>> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map > (Map at > >>>> main(Jsonizer.java:128)) (4/9) switched to FAILED with exception. > >>>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > >>>> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map > (Map at > >>>> main(Jsonizer.java:128))' , caused an error: Error obtaining the > sorted > >>>> input: Thread 'SortMerger spilling thread' terminated due to an > exception: > >>>> Unable to find class: java.ttil.HashSet > >>>> at > >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) > >>>> at > >>>> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > >>>> at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >>>> at java.lang.Thread.run(Thread.java:745) > >>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > >>>> Thread 'SortMerger spilling thread' terminated due to an exception: > Unable > >>>> to find class: java.ttil.HashSet > >>>> at > >>>> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > >>>> at > >>>> > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) > >>>> at > >>>> > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > >>>> at > >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) > >>>> ... 3 more > >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' > >>>> terminated due to an exception: Unable to find class: > java.ttil.HashSet > >>>> at > >>>> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) > >>>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find > >>>> class: java.ttil.HashSet > >>>> at > >>>> > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) > >>>> at > >>>> > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) > >>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > >>>> at > >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > >>>> at > >>>> > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > >>>> at > >>>> > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > >>>> at > >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >>>> at > >>>> > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) > >>>> at > >>>> > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) > >>>> at > >>>> > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) > >>>> at > >>>> > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) > >>>> at > >>>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) > >>>> at > >>>> > org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) > >>>> at > >>>> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) > >>>> at > >>>> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > >>>> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet > >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > >>>> at java.lang.Class.forName0(Native Method) > >>>> at java.lang.Class.forName(Class.java:348) > >>>> at > >>>> > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) > >>>> ... 14 more > >>>> > >>> > >> > > >