I think this bug comes from something in SpillingAdaptiveSpanningRecordDeserializer..I've tried to find a common point of failure in all those messages and I found that it contains also this error message that I got once:
private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE = "Serializer consumed more bytes than the record had. " + "This indicates broken serialization. If you are using custom serialization types " + "(Value or Writable), check their serialization methods. If you are using a " + "Kryo-serialized type, check the corresponding Kryo serializer."; Any clue about how to find what is causing this? On Sat, May 21, 2016 at 10:53 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > I tried to move flink tmp dir from hdd disks to sdd ones (in order to > exclude faulty disks) and I had another of those Exception: > > java.lang.IllegalArgumentException: The datetime zone id 'Europe/Romd' is > not recognised > at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229) > at > de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94) > at > de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74) > at > de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:441) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > > > On Fri, May 20, 2016 at 8:34 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Right now I'm using Flink 1.0.2...to which version should I downgrade? >> The hardware seems to be ok..how could I detect a faulty hardware? >> These errors appeared in every run of my job after I moved the temporary >> directory from ssd to hdd and I extended my pipeline with a dataset that >> grows as the pipeline goes on,accumulating data from intermediate datasets.. >> On 20 May 2016 18:31, "Fabian Hueske" <fhue...@gmail.com> wrote: >> >>> The problem seems to occur quite often. >>> Did you update your Flink version recently? If so, could you try to >>> downgrade and see if the problem disappears. >>> >>> Is it otherwise possible that it is cause by faulty hardware? >>> >>> 2016-05-20 18:05 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> This time (Europed instead of Europe): >>>> >>>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce >>>> (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map >>>> (Key Extractor)' , caused an error: Error obtaining the sorted input: >>>> Thread 'SortMerger spilling thread' terminated due to an exception: The >>>> datetime zone id 'Europd/Rome' is not recognised >>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >>>> at >>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >>>> Thread 'SortMerger spilling thread' terminated due to an exception: The >>>> datetime zone id 'Europd/Rome' is not recognised >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>> at >>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>> at >>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) >>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>> ... 3 more >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>> terminated due to an exception: The datetime zone id 'Europd/Rome' is not >>>> recognised >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>> Caused by: java.lang.IllegalArgumentException: The datetime zone id >>>> 'Europd/Rome' is not recognised >>>> at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229) >>>> at >>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94) >>>> at >>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74) >>>> at >>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59) >>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) >>>> at >>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >>>> at >>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>> >>>> >>>> >>>> On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> This time another error (rerialization instead of serialization): >>>>> >>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>>>> at >>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>> at >>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>> at >>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>> at >>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>> at >>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> it.okkam.flink.entitons.rerialization.pojo.EntitonQuadPojo >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> at java.lang.Class.forName0(Native Method) >>>>> at java.lang.Class.forName(Class.java:348) >>>>> at >>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>> ... 20 more >>>>> >>>>> >>>>> >>>>> On Fri, May 20, 2016 at 12:48 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Hi Ufuk, >>>>>> my records could be quite large Pojos (I think some MB). >>>>>> The only thing I do to configure Kryo is: >>>>>> >>>>>> env.registerTypeWithKryoSerializer(DateTime.class, >>>>>> JodaDateTimeSerializer.class ); >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>>> On Fri, May 20, 2016 at 12:38 PM, Ufuk Celebi <u...@apache.org> wrote: >>>>>> >>>>>>> @Stefano: the records are serialized anyway for batch jobs. The >>>>>>> spilling deserializer is only relevant if single records are very >>>>>>> large. How large are your records? In any case, I don't expect this >>>>>>> to >>>>>>> be the problem. >>>>>>> >>>>>>> @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle and >>>>>>> ttil instead of util) look like some kind of data corruption and >>>>>>> would >>>>>>> need further investigation. The other failure you reported might be >>>>>>> related to this. As a starting point, how do you configure the Kryo >>>>>>> serializer? >>>>>>> >>>>>>> On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier >>>>>>> <pomperma...@okkam.it> wrote: >>>>>>> > Today I've got this other strange error.. Obviously I don't have a >>>>>>> > VdhicleEvent class, but a VehicleEvent class :( >>>>>>> > >>>>>>> > java.lang.RuntimeException: Cannot instantiate class. >>>>>>> > at >>>>>>> > >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) >>>>>>> > at >>>>>>> > >>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>> > at >>>>>>> > >>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>>>> > at >>>>>>> > >>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>>> > at >>>>>>> > >>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>>> > at >>>>>>> > >>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>>>> > at >>>>>>> > >>>>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>>>>>> > at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>> > at >>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>> > at >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> > at java.lang.Thread.run(Thread.java:745) >>>>>>> > Caused by: java.lang.ClassNotFoundException: >>>>>>> > it.okkam.flink.test.model.pojo.VdhicleEvent >>>>>>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> > at java.lang.Class.forName0(Native Method) >>>>>>> > at java.lang.Class.forName(Class.java:348) >>>>>>> > at >>>>>>> > >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) >>>>>>> > ... 10 more >>>>>>> > >>>>>>> > >>>>>>> > Thanks in advance, >>>>>>> > Flavio >>>>>>> > >>>>>>> > >>>>>>> > On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli < >>>>>>> s.bort...@gmail.com> >>>>>>> > wrote: >>>>>>> >> >>>>>>> >> Hi Flavio, Till, >>>>>>> >> >>>>>>> >> do you think this can be possibly related to the serialization >>>>>>> problem >>>>>>> >> caused by 'the management' of Kryo serializer buffer when >>>>>>> spilling on disk? >>>>>>> >> We are definitely going beyond what is managed in memory with >>>>>>> this task. >>>>>>> >> >>>>>>> >> saluti, >>>>>>> >> Stefano >>>>>>> >> >>>>>>> >> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier < >>>>>>> pomperma...@okkam.it>: >>>>>>> >>> >>>>>>> >>> That exception showed just once, but the following happens >>>>>>> randomly (if I >>>>>>> >>> re-run the job after stopping and restartign the cluster it >>>>>>> doesn't show up >>>>>>> >>> usually): >>>>>>> >>> >>>>>>> >>> Caused by: java.io.IOException: Serializer consumed more bytes >>>>>>> than the >>>>>>> >>> record had. This indicates broken serialization. If you are >>>>>>> using custom >>>>>>> >>> serialization types (Value or Writable), check their >>>>>>> serialization methods. >>>>>>> >>> If you are using a Kryo-serialized type, check the corresponding >>>>>>> Kryo >>>>>>> >>> serializer. >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>> >>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> >>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>>>>>> >>> at java.util.ArrayList.elementData(ArrayList.java:418) >>>>>>> >>> at java.util.ArrayList.get(ArrayList.java:431) >>>>>>> >>> at >>>>>>> >>> >>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >>>>>>> >>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >>>>>>> >>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>> >>> at >>>>>>> >>> >>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> On Sun, May 15, 2016 at 10:58 AM, Flavio Pompermaier >>>>>>> >>> <pomperma...@okkam.it> wrote: >>>>>>> >>>> >>>>>>> >>>> Hi to all, >>>>>>> >>>> in my last run of a job I received this weird Kryo Exception in >>>>>>> one of >>>>>>> >>>> the TaskManager...obviously this class in not mentioned >>>>>>> anywhere, neither in >>>>>>> >>>> my project nor in flink... >>>>>>> >>>> Any help is appreciated! >>>>>>> >>>> >>>>>>> >>>> Best, >>>>>>> >>>> Flavio >>>>>>> >>>> >>>>>>> >>>> INFO org.apache.flink.runtime.taskmanager.Task - CHAIN >>>>>>> GroupReduce >>>>>>> >>>> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> >>>>>>> Map (Map at >>>>>>> >>>> main(Jsonizer.java:128)) (4/9) switched to FAILED with >>>>>>> exception. >>>>>>> >>>> java.lang.Exception: The data preparation for task 'CHAIN >>>>>>> GroupReduce >>>>>>> >>>> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> >>>>>>> Map (Map at >>>>>>> >>>> main(Jsonizer.java:128))' , caused an error: Error obtaining >>>>>>> the sorted >>>>>>> >>>> input: Thread 'SortMerger spilling thread' terminated due to an >>>>>>> exception: >>>>>>> >>>> Unable to find class: java.ttil.HashSet >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>> >>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>> >>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>> Caused by: java.lang.RuntimeException: Error obtaining the >>>>>>> sorted input: >>>>>>> >>>> Thread 'SortMerger spilling thread' terminated due to an >>>>>>> exception: Unable >>>>>>> >>>> to find class: java.ttil.HashSet >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>>>>> >>>> ... 3 more >>>>>>> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling >>>>>>> thread' >>>>>>> >>>> terminated due to an exception: Unable to find class: >>>>>>> java.ttil.HashSet >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>>>>> >>>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to >>>>>>> find >>>>>>> >>>> class: java.ttil.HashSet >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>> >>>> at >>>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>> >>>> at >>>>>>> >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>> >>>> at >>>>>>> >>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>> >>>> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet >>>>>>> >>>> at >>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> >>>> at java.lang.Class.forName0(Native Method) >>>>>>> >>>> at java.lang.Class.forName(Class.java:348) >>>>>>> >>>> at >>>>>>> >>>> >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>> >>>> ... 14 more >>>>>>> >>>> >>>>>>> >>> >>>>>>> >> >>>>>>> > >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> > > >