I've slightly modified the program to shorten the length on the entire job and this time I had this Exception:
2016-05-23 09:26:51,438 ERROR org.apache.flink.runtime.io.disk.iomanager.IOManager - IO Thread 'IOManager writer thread #1' terminated due to an exception. Shutting down I/O Manager. java.lang.ClassCastException: java.nio.DirectByteBuffer$Deallocator cannot be cast to org.apache.flink.runtime.io.disk.iomanager.WriteRequest at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486) I don't know wheter this is related to the others or not.. On Sat, May 21, 2016 at 11:00 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > I think this bug comes from something in > SpillingAdaptiveSpanningRecordDeserializer..I've tried to find a common > point of failure in all those messages and I found that it contains also > this error message that I got once: > > private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE = > "Serializer consumed more bytes than the record > had. " + > "This indicates broken serialization. If you are > using custom serialization types " + > "(Value or Writable), check their serialization > methods. If you are using a " + > "Kryo-serialized type, check the corresponding > Kryo serializer."; > > Any clue about how to find what is causing this? > > > > > On Sat, May 21, 2016 at 10:53 AM, Flavio Pompermaier <pomperma...@okkam.it > > wrote: > >> I tried to move flink tmp dir from hdd disks to sdd ones (in order to >> exclude faulty disks) and I had another of those Exception: >> >> java.lang.IllegalArgumentException: The datetime zone id 'Europe/Romd' is >> not recognised >> at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229) >> at >> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94) >> at >> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74) >> at >> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:441) >> at >> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> at >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> at >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >> at >> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> >> On Fri, May 20, 2016 at 8:34 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> Right now I'm using Flink 1.0.2...to which version should I downgrade? >>> The hardware seems to be ok..how could I detect a faulty hardware? >>> These errors appeared in every run of my job after I moved the temporary >>> directory from ssd to hdd and I extended my pipeline with a dataset that >>> grows as the pipeline goes on,accumulating data from intermediate datasets.. >>> On 20 May 2016 18:31, "Fabian Hueske" <fhue...@gmail.com> wrote: >>> >>>> The problem seems to occur quite often. >>>> Did you update your Flink version recently? If so, could you try to >>>> downgrade and see if the problem disappears. >>>> >>>> Is it otherwise possible that it is cause by faulty hardware? >>>> >>>> 2016-05-20 18:05 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> This time (Europed instead of Europe): >>>>> >>>>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce >>>>> (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map >>>>> (Key Extractor)' , caused an error: Error obtaining the sorted input: >>>>> Thread 'SortMerger spilling thread' terminated due to an exception: The >>>>> datetime zone id 'Europd/Rome' is not recognised >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >>>>> Thread 'SortMerger spilling thread' terminated due to an exception: The >>>>> datetime zone id 'Europd/Rome' is not recognised >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>> at >>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>>> at >>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>>> ... 3 more >>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >>>>> terminated due to an exception: The datetime zone id 'Europd/Rome' is not >>>>> recognised >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>>> Caused by: java.lang.IllegalArgumentException: The datetime zone id >>>>> 'Europd/Rome' is not recognised >>>>> at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229) >>>>> at >>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94) >>>>> at >>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74) >>>>> at >>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59) >>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) >>>>> at >>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >>>>> at >>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>> >>>>> >>>>> >>>>> On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> This time another error (rerialization instead of serialization): >>>>>> >>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: >>>>>> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo >>>>>> at >>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>> at >>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>> at >>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>> at >>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>>> at >>>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>> it.okkam.flink.entitons.rerialization.pojo.EntitonQuadPojo >>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>> at java.lang.Class.forName0(Native Method) >>>>>> at java.lang.Class.forName(Class.java:348) >>>>>> at >>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>> ... 20 more >>>>>> >>>>>> >>>>>> >>>>>> On Fri, May 20, 2016 at 12:48 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Hi Ufuk, >>>>>>> my records could be quite large Pojos (I think some MB). >>>>>>> The only thing I do to configure Kryo is: >>>>>>> >>>>>>> env.registerTypeWithKryoSerializer(DateTime.class, >>>>>>> JodaDateTimeSerializer.class ); >>>>>>> >>>>>>> Best, >>>>>>> Flavio >>>>>>> >>>>>>> On Fri, May 20, 2016 at 12:38 PM, Ufuk Celebi <u...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> @Stefano: the records are serialized anyway for batch jobs. The >>>>>>>> spilling deserializer is only relevant if single records are very >>>>>>>> large. How large are your records? In any case, I don't expect this >>>>>>>> to >>>>>>>> be the problem. >>>>>>>> >>>>>>>> @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle >>>>>>>> and >>>>>>>> ttil instead of util) look like some kind of data corruption and >>>>>>>> would >>>>>>>> need further investigation. The other failure you reported might be >>>>>>>> related to this. As a starting point, how do you configure the Kryo >>>>>>>> serializer? >>>>>>>> >>>>>>>> On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier >>>>>>>> <pomperma...@okkam.it> wrote: >>>>>>>> > Today I've got this other strange error.. Obviously I don't have a >>>>>>>> > VdhicleEvent class, but a VehicleEvent class :( >>>>>>>> > >>>>>>>> > java.lang.RuntimeException: Cannot instantiate class. >>>>>>>> > at >>>>>>>> > >>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) >>>>>>>> > at >>>>>>>> > >>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>>> > at >>>>>>>> > >>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>>>>> > at >>>>>>>> > >>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>>>> > at >>>>>>>> > >>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>>>> > at >>>>>>>> > >>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>>>>> > at >>>>>>>> > >>>>>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>>>>>>> > at >>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>>> > at >>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>>> > at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>> > at java.lang.Thread.run(Thread.java:745) >>>>>>>> > Caused by: java.lang.ClassNotFoundException: >>>>>>>> > it.okkam.flink.test.model.pojo.VdhicleEvent >>>>>>>> > at >>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>>> > at java.lang.Class.forName0(Native Method) >>>>>>>> > at java.lang.Class.forName(Class.java:348) >>>>>>>> > at >>>>>>>> > >>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) >>>>>>>> > ... 10 more >>>>>>>> > >>>>>>>> > >>>>>>>> > Thanks in advance, >>>>>>>> > Flavio >>>>>>>> > >>>>>>>> > >>>>>>>> > On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli < >>>>>>>> s.bort...@gmail.com> >>>>>>>> > wrote: >>>>>>>> >> >>>>>>>> >> Hi Flavio, Till, >>>>>>>> >> >>>>>>>> >> do you think this can be possibly related to the serialization >>>>>>>> problem >>>>>>>> >> caused by 'the management' of Kryo serializer buffer when >>>>>>>> spilling on disk? >>>>>>>> >> We are definitely going beyond what is managed in memory with >>>>>>>> this task. >>>>>>>> >> >>>>>>>> >> saluti, >>>>>>>> >> Stefano >>>>>>>> >> >>>>>>>> >> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it>: >>>>>>>> >>> >>>>>>>> >>> That exception showed just once, but the following happens >>>>>>>> randomly (if I >>>>>>>> >>> re-run the job after stopping and restartign the cluster it >>>>>>>> doesn't show up >>>>>>>> >>> usually): >>>>>>>> >>> >>>>>>>> >>> Caused by: java.io.IOException: Serializer consumed more bytes >>>>>>>> than the >>>>>>>> >>> record had. This indicates broken serialization. If you are >>>>>>>> using custom >>>>>>>> >>> serialization types (Value or Writable), check their >>>>>>>> serialization methods. >>>>>>>> >>> If you are using a Kryo-serialized type, check the >>>>>>>> corresponding Kryo >>>>>>>> >>> serializer. >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>>> >>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>> >>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>>>>>>> >>> at java.util.ArrayList.elementData(ArrayList.java:418) >>>>>>>> >>> at java.util.ArrayList.get(ArrayList.java:431) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >>>>>>>> >>> at >>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >>>>>>>> >>> at >>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >>>>>>>> >>> at >>>>>>>> >>> >>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> On Sun, May 15, 2016 at 10:58 AM, Flavio Pompermaier >>>>>>>> >>> <pomperma...@okkam.it> wrote: >>>>>>>> >>>> >>>>>>>> >>>> Hi to all, >>>>>>>> >>>> in my last run of a job I received this weird Kryo Exception >>>>>>>> in one of >>>>>>>> >>>> the TaskManager...obviously this class in not mentioned >>>>>>>> anywhere, neither in >>>>>>>> >>>> my project nor in flink... >>>>>>>> >>>> Any help is appreciated! >>>>>>>> >>>> >>>>>>>> >>>> Best, >>>>>>>> >>>> Flavio >>>>>>>> >>>> >>>>>>>> >>>> INFO org.apache.flink.runtime.taskmanager.Task - CHAIN >>>>>>>> GroupReduce >>>>>>>> >>>> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> >>>>>>>> Map (Map at >>>>>>>> >>>> main(Jsonizer.java:128)) (4/9) switched to FAILED with >>>>>>>> exception. >>>>>>>> >>>> java.lang.Exception: The data preparation for task 'CHAIN >>>>>>>> GroupReduce >>>>>>>> >>>> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> >>>>>>>> Map (Map at >>>>>>>> >>>> main(Jsonizer.java:128))' , caused an error: Error obtaining >>>>>>>> the sorted >>>>>>>> >>>> input: Thread 'SortMerger spilling thread' terminated due to >>>>>>>> an exception: >>>>>>>> >>>> Unable to find class: java.ttil.HashSet >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>>>>>>> >>>> at >>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>> >>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> >>>> Caused by: java.lang.RuntimeException: Error obtaining the >>>>>>>> sorted input: >>>>>>>> >>>> Thread 'SortMerger spilling thread' terminated due to an >>>>>>>> exception: Unable >>>>>>>> >>>> to find class: java.ttil.HashSet >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >>>>>>>> >>>> ... 3 more >>>>>>>> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling >>>>>>>> thread' >>>>>>>> >>>> terminated due to an exception: Unable to find class: >>>>>>>> java.ttil.HashSet >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >>>>>>>> >>>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to >>>>>>>> find >>>>>>>> >>>> class: java.ttil.HashSet >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>>>>>>> >>>> at >>>>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>> >>>> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet >>>>>>>> >>>> at >>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>>> >>>> at >>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>>> >>>> at >>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>>> >>>> at java.lang.Class.forName0(Native Method) >>>>>>>> >>>> at java.lang.Class.forName(Class.java:348) >>>>>>>> >>>> at >>>>>>>> >>>> >>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) >>>>>>>> >>>> ... 14 more >>>>>>>> >>>> >>>>>>>> >>> >>>>>>>> >> >>>>>>>> > >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >> >> >