Flavio Pompermaier created FLINK-4719:
-----------------------------------------

             Summary: KryoSerializer random exception
                 Key: FLINK-4719
                 URL: https://issues.apache.org/jira/browse/FLINK-4719
             Project: Flink
          Issue Type: Bug
          Components: Core
    Affects Versions: 1.1.1
            Reporter: Flavio Pompermaier


There's a random exception that involves somehow the KryoSerializer when using 
POJOs in Flink jobs reading large volumes of data.

It is usually thrown in several places, e.g. (the Exceptions reported here can 
refer to previous versions of Flink...):

{code}
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
(GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at 
main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted input: 
Thread 'SortMerger spilling thread' terminated due to an exception: Unable to 
find class: java.ttil.HashSet
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger spilling thread' terminated due to an exception: Unable to find 
class: java.ttil.HashSet
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated 
due to an exception: Unable to find class: java.ttil.HashSet
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 
java.ttil.HashSet
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
        at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
{code}

{code}
Caused by: java.io.IOException: Serializer consumed more bytes than the record 
had. This indicates broken serialization. If you are using custom serialization 
types (Value or Writable), check their serialization methods. If you are using 
a Kryo-serialized type, check the corresponding Kryo serializer.
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
    at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
    at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
    at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
    at java.util.ArrayList.elementData(ArrayList.java:418)
    at java.util.ArrayList.get(ArrayList.java:431)
    at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
    at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
    at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
    at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
    at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
{code}

{code}
java.lang.RuntimeException: Cannot instantiate class.
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
        at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
it.okkam.flink.test.model.pojo.VdhicleEvent
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
        ... 10 more
{code}

{code}
com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

{code}

{code}
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
        at java.util.ArrayList.elementData(ArrayList.java:418)
        at java.util.ArrayList.get(ArrayList.java:431)
        at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at 
org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
        at 
org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
        at 
org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
        at 
org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
        at 
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
        at 
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
        at 
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
        at 
org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
        at 
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to