This seems to be irrelevant to the issue for KyroSerializer in recent
discussions [1]. which has been fixed in 1.4.3, 1.5.0 and 1.6.0.
On a quick glance, this might have been a corrupted message in your
decoding, for example a malformed JSON string.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-8836

On Wed, Aug 22, 2018 at 8:41 AM Darshan Singh <darshan.m...@gmail.com>
wrote:

> Hi,
>
> I am using a map function on a data stream which has 1 column i.e. a json
> string. Map function simply uses Jackson mapper and convert the String to
> ObjectNode and also assign key based on one of the value in Object node.
>
> The code seems to work fine for 2-3 minutes as expected and then suddenly
> it fails with below error. I looked at the mailing list and most of the
> issues mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not
> sure what needs to do.
>
> Just wanted to know if we will need to write our own Serializer for
> ObjectNode to fix this issue or there is some setting we are missing.
>
> Thanks
>
> ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0
> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
> at java.util.ArrayList.get(ArrayList.java:433)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
>
>

Reply via email to