Changing

   - taskmanager.memory.fraction, from 0.7 to 0.9
   - taskmanager.memory.off-heap, from true to false
   - decreasing the slots of each tm from 2 to 1

I had this Exception:
java.lang.Exception: The data preparation for task 'GroupReduce
(GroupReduce at main(AciDataInference.java:331))' , caused an error: Error
obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
due to an exception: -2
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger spilling thread' terminated due to an exception: -2
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
        at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: -2
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
        at java.util.ArrayList.elementData(ArrayList.java:418)
        at java.util.ArrayList.get(ArrayList.java:431)
        at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
        at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
        at
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
        at
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


On Mon, May 23, 2016 at 10:04 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Changing
>
>    - taskmanager.memory.fraction, from 0.9 to 0.7
>    - taskmanager.memory.off-heap, from false to true
>    - decreasing the slots of each tm from 3 to 2
>
> I had this error:
>
> 2016-05-23 09:55:42,534 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN FlatMap (FlatMap at main(MyApp.java:246)) -> Map (Key
> Extractor) (7/14)
> java.io.IOException: Received an event in channel 0 while still having
> data from a record. This indicates broken serialization logic. If you are
> using custom serialization code (Writable or Value types), check their
> serialization routines. In the case of Kryo, check the respective Kryo
> serializer.
>         at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:90)
>         at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> On Mon, May 23, 2016 at 9:31 AM, Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> I've slightly modified the program to shorten the length on the entire
>> job and this time I had this Exception:
>>
>> 2016-05-23 09:26:51,438 ERROR
>> org.apache.flink.runtime.io.disk.iomanager.IOManager          - IO Thread
>> 'IOManager writer thread #1' terminated due to an exception. Shutting down
>> I/O Manager.
>> java.lang.ClassCastException: java.nio.DirectByteBuffer$Deallocator
>> cannot be cast to org.apache.flink.runtime.io.disk.iomanager.WriteRequest
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)
>>
>>
>> I don't know wheter this is related to the others or not..
>>
>>
>> On Sat, May 21, 2016 at 11:00 AM, Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> I think this bug comes from something in
>>> SpillingAdaptiveSpanningRecordDeserializer..I've tried to find a common
>>> point of failure in all those messages and I found that it contains also
>>> this error message that I got once:
>>>
>>> private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
>>>                         "Serializer consumed more bytes than the record
>>> had. " +
>>>                         "This indicates broken serialization. If you are
>>> using custom serialization types " +
>>>                         "(Value or Writable), check their serialization
>>> methods. If you are using a " +
>>>                         "Kryo-serialized type, check the corresponding
>>> Kryo serializer.";
>>>
>>> Any clue about how to find what is causing this?
>>>
>>>
>>>
>>>
>>> On Sat, May 21, 2016 at 10:53 AM, Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
>>>> I tried to move flink tmp dir from hdd disks to sdd ones (in order to
>>>>  exclude faulty disks) and I had another of those Exception:
>>>>
>>>> java.lang.IllegalArgumentException: The datetime zone id 'Europe/Romd'
>>>> is not recognised
>>>> at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229)
>>>> at
>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94)
>>>> at
>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74)
>>>> at
>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59)
>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:501)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:441)
>>>> at
>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>> at
>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>> at
>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>> at
>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>> On Fri, May 20, 2016 at 8:34 PM, Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Right now I'm using Flink 1.0.2...to which version should I downgrade?
>>>>> The hardware seems to be ok..how could I detect a faulty hardware?
>>>>> These errors appeared in every run of my job after I moved the
>>>>> temporary directory from ssd to hdd and I extended my pipeline with a
>>>>> dataset that grows as the pipeline goes on,accumulating data from
>>>>> intermediate datasets..
>>>>> On 20 May 2016 18:31, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>>>>
>>>>>> The problem seems to occur quite often.
>>>>>> Did you update your Flink version recently? If so, could you try to
>>>>>> downgrade and see if the problem disappears.
>>>>>>
>>>>>> Is it otherwise possible that it is cause by faulty hardware?
>>>>>>
>>>>>> 2016-05-20 18:05 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>>>>>>
>>>>>>> This time (Europed instead of Europe):
>>>>>>>
>>>>>>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
>>>>>>> (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> 
>>>>>>> Map (Key Extractor)' , caused an error: Error obtaining the sorted 
>>>>>>> input: Thread 'SortMerger spilling thread' terminated due to an 
>>>>>>> exception: The datetime zone id 'Europd/Rome' is not recognised
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted 
>>>>>>> input: Thread 'SortMerger spilling thread' terminated due to an 
>>>>>>> exception: The datetime zone id 'Europd/Rome' is not recognised
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>>>>         ... 3 more
>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
>>>>>>> terminated due to an exception: The datetime zone id 'Europd/Rome' is 
>>>>>>> not recognised
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>> Caused by: java.lang.IllegalArgumentException: The datetime zone id 
>>>>>>> 'Europd/Rome' is not recognised
>>>>>>>         at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229)
>>>>>>>         at 
>>>>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94)
>>>>>>>         at 
>>>>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74)
>>>>>>>         at 
>>>>>>> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59)
>>>>>>>         at 
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>         at 
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>         at 
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>>>>>>         at 
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>>>>>>>         at 
>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>>>>>>>         at 
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>         at 
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> This time another error (rerialization instead of serialization):
>>>>>>>>
>>>>>>>> com.esotericsoftware.kryo.KryoException: Unable to find class: 
>>>>>>>> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo
>>>>>>>>        at 
>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>>        at 
>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>>        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>>        at 
>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>>        at 
>>>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>>>>>>>        at 
>>>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>>>>>>>        at 
>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>>        at 
>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>        at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.lang.ClassNotFoundException: 
>>>>>>>> it.okkam.flink.entitons.rerialization.pojo.EntitonQuadPojo
>>>>>>>>        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>>>        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>>>        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>>>        at java.lang.Class.forName0(Native Method)
>>>>>>>>        at java.lang.Class.forName(Class.java:348)
>>>>>>>>        at 
>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>>>        ... 20 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, May 20, 2016 at 12:48 PM, Flavio Pompermaier <
>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> Hi Ufuk,
>>>>>>>>> my records could be quite large Pojos (I think some MB).
>>>>>>>>> The only thing I do to configure Kryo is:
>>>>>>>>>
>>>>>>>>>  env.registerTypeWithKryoSerializer(DateTime.class,
>>>>>>>>> JodaDateTimeSerializer.class );
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>> On Fri, May 20, 2016 at 12:38 PM, Ufuk Celebi <u...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> @Stefano: the records are serialized anyway for batch jobs. The
>>>>>>>>>> spilling deserializer is only relevant if single records are very
>>>>>>>>>> large. How large are your records? In any case, I don't expect
>>>>>>>>>> this to
>>>>>>>>>> be the problem.
>>>>>>>>>>
>>>>>>>>>> @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle
>>>>>>>>>> and
>>>>>>>>>> ttil instead of util) look like some kind of data corruption and
>>>>>>>>>> would
>>>>>>>>>> need further investigation. The other failure you reported might
>>>>>>>>>> be
>>>>>>>>>> related to this. As a starting point, how do you configure the
>>>>>>>>>> Kryo
>>>>>>>>>> serializer?
>>>>>>>>>>
>>>>>>>>>> On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier
>>>>>>>>>> <pomperma...@okkam.it> wrote:
>>>>>>>>>> > Today I've got this other strange error.. Obviously I don't
>>>>>>>>>> have a
>>>>>>>>>> > VdhicleEvent class, but a VehicleEvent class :(
>>>>>>>>>> >
>>>>>>>>>> > java.lang.RuntimeException: Cannot instantiate class.
>>>>>>>>>> >       at
>>>>>>>>>> >
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
>>>>>>>>>> >       at
>>>>>>>>>> >
>>>>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>>>> >       at
>>>>>>>>>> >
>>>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>>>>> >       at
>>>>>>>>>> >
>>>>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>>>>> >       at
>>>>>>>>>> >
>>>>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>>>>> >       at
>>>>>>>>>> >
>>>>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>>>>> >       at
>>>>>>>>>> >
>>>>>>>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>>>>>>>>> >       at
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>>>> >       at
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>>>> >       at
>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>> >       at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> > Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>> > it.okkam.flink.test.model.pojo.VdhicleEvent
>>>>>>>>>> >       at
>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>>>>> >       at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>>>>> >       at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>>>>> >       at java.lang.Class.forName0(Native Method)
>>>>>>>>>> >       at java.lang.Class.forName(Class.java:348)
>>>>>>>>>> >       at
>>>>>>>>>> >
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
>>>>>>>>>> >       ... 10 more
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > Thanks in advance,
>>>>>>>>>> > Flavio
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli <
>>>>>>>>>> s.bort...@gmail.com>
>>>>>>>>>> > wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> Hi Flavio, Till,
>>>>>>>>>> >>
>>>>>>>>>> >> do you think this can be possibly related to the serialization
>>>>>>>>>> problem
>>>>>>>>>> >> caused by 'the management' of Kryo serializer buffer when
>>>>>>>>>> spilling on disk?
>>>>>>>>>> >> We are definitely going beyond what is managed in memory with
>>>>>>>>>> this task.
>>>>>>>>>> >>
>>>>>>>>>> >> saluti,
>>>>>>>>>> >> Stefano
>>>>>>>>>> >>
>>>>>>>>>> >> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it>:
>>>>>>>>>> >>>
>>>>>>>>>> >>> That exception showed just once, but the following happens
>>>>>>>>>> randomly (if I
>>>>>>>>>> >>> re-run the job after stopping and restartign the cluster it
>>>>>>>>>> doesn't show up
>>>>>>>>>> >>> usually):
>>>>>>>>>> >>>
>>>>>>>>>> >>> Caused by: java.io.IOException: Serializer consumed more
>>>>>>>>>> bytes than the
>>>>>>>>>> >>> record had. This indicates broken serialization. If you are
>>>>>>>>>> using custom
>>>>>>>>>> >>> serialization types (Value or Writable), check their
>>>>>>>>>> serialization methods.
>>>>>>>>>> >>> If you are using a Kryo-serialized type, check the
>>>>>>>>>> corresponding Kryo
>>>>>>>>>> >>> serializer.
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>>>> >>>     at
>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>> >>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>>>>>>>>>> >>>     at java.util.ArrayList.elementData(ArrayList.java:418)
>>>>>>>>>> >>>     at java.util.ArrayList.get(ArrayList.java:431)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>>>>>>>>>> >>>     at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>>>>>>>>> >>>     at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>>>>>>>>> >>>     at
>>>>>>>>>> >>>
>>>>>>>>>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Sun, May 15, 2016 at 10:58 AM, Flavio Pompermaier
>>>>>>>>>> >>> <pomperma...@okkam.it> wrote:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Hi to all,
>>>>>>>>>> >>>> in my last run of a job I received this weird Kryo Exception
>>>>>>>>>> in one of
>>>>>>>>>> >>>> the TaskManager...obviously this class in not mentioned
>>>>>>>>>> anywhere, neither in
>>>>>>>>>> >>>> my project nor in flink...
>>>>>>>>>> >>>> Any help is appreciated!
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Best,
>>>>>>>>>> >>>> Flavio
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> INFO  org.apache.flink.runtime.taskmanager.Task - CHAIN
>>>>>>>>>> GroupReduce
>>>>>>>>>> >>>> (GroupReduce at createResult(IndexMappingExecutor.java:42))
>>>>>>>>>> -> Map (Map at
>>>>>>>>>> >>>> main(Jsonizer.java:128)) (4/9) switched to FAILED with
>>>>>>>>>> exception.
>>>>>>>>>> >>>> java.lang.Exception: The data preparation for task 'CHAIN
>>>>>>>>>> GroupReduce
>>>>>>>>>> >>>> (GroupReduce at createResult(IndexMappingExecutor.java:42))
>>>>>>>>>> -> Map (Map at
>>>>>>>>>> >>>> main(Jsonizer.java:128))' , caused an error: Error obtaining
>>>>>>>>>> the sorted
>>>>>>>>>> >>>> input: Thread 'SortMerger spilling thread' terminated due to
>>>>>>>>>> an exception:
>>>>>>>>>> >>>> Unable to find class: java.ttil.HashSet
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>>>>>>>>>> >>>>         at
>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>> >>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> >>>> Caused by: java.lang.RuntimeException: Error obtaining the
>>>>>>>>>> sorted input:
>>>>>>>>>> >>>> Thread 'SortMerger spilling thread' terminated due to an
>>>>>>>>>> exception: Unable
>>>>>>>>>> >>>> to find class: java.ttil.HashSet
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>>>>>>>>>> >>>>         ... 3 more
>>>>>>>>>> >>>> Caused by: java.io.IOException: Thread 'SortMerger spilling
>>>>>>>>>> thread'
>>>>>>>>>> >>>> terminated due to an exception: Unable to find class:
>>>>>>>>>> java.ttil.HashSet
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>>>>> >>>> Caused by: com.esotericsoftware.kryo.KryoException: Unable
>>>>>>>>>> to find
>>>>>>>>>> >>>> class: java.ttil.HashSet
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>>>>>>>>> >>>>         at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>> >>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>>> java.ttil.HashSet
>>>>>>>>>> >>>>         at
>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>>>>> >>>>         at
>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>>>>> >>>>         at
>>>>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>>>>> >>>>         at java.lang.Class.forName0(Native Method)
>>>>>>>>>> >>>>         at java.lang.Class.forName(Class.java:348)
>>>>>>>>>> >>>>         at
>>>>>>>>>> >>>>
>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>>>>>>>>>> >>>>         ... 14 more
>>>>>>>>>> >>>>
>>>>>>>>>> >>>
>>>>>>>>>> >>
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>>
>>>
>>
>
>

Reply via email to