This seems to be irrelevant to the issue for KyroSerializer in recent discussions [1]. which has been fixed in 1.4.3, 1.5.0 and 1.6.0. On a quick glance, this might have been a corrupted message in your decoding, for example a malformed JSON string.
-- Rong [1] https://issues.apache.org/jira/browse/FLINK-8836 On Wed, Aug 22, 2018 at 8:41 AM Darshan Singh <darshan.m...@gmail.com> wrote: > Hi, > > I am using a map function on a data stream which has 1 column i.e. a json > string. Map function simply uses Jackson mapper and convert the String to > ObjectNode and also assign key based on one of the value in Object node. > > The code seems to work fine for 2-3 minutes as expected and then suddenly > it fails with below error. I looked at the mailing list and most of the > issues mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not > sure what needs to do. > > Just wanted to know if we will need to write our own Serializer for > ObjectNode to fix this issue or there is some setting we are missing. > > Thanks > > ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) > at java.util.ArrayList.get(ArrayList.java:433) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > >