Thanks, We ran into differnet errors and then realized it was OOM issue which was causing different parts to be failed. Flink was buffering too much data as we were reading too fast from source. Reducing the speed fixed the issue.
However, I am curious how to achieve the same with S3 apart from limiting the number of files to read at same time. Thanks On Sun, Aug 26, 2018 at 5:32 PM Rong Rong <walter...@gmail.com> wrote: > This seems to be irrelevant to the issue for KyroSerializer in recent > discussions [1]. which has been fixed in 1.4.3, 1.5.0 and 1.6.0. > On a quick glance, this might have been a corrupted message in your > decoding, for example a malformed JSON string. > > -- > Rong > > [1] https://issues.apache.org/jira/browse/FLINK-8836 > > On Wed, Aug 22, 2018 at 8:41 AM Darshan Singh <darshan.m...@gmail.com> > wrote: > >> Hi, >> >> I am using a map function on a data stream which has 1 column i.e. a json >> string. Map function simply uses Jackson mapper and convert the String to >> ObjectNode and also assign key based on one of the value in Object node. >> >> The code seems to work fine for 2-3 minutes as expected and then suddenly >> it fails with below error. I looked at the mailing list and most of the >> issues mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not >> sure what needs to do. >> >> Just wanted to know if we will need to write our own Serializer for >> ObjectNode to fix this issue or there is some setting we are missing. >> >> Thanks >> >> ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0 >> at java.util.ArrayList.rangeCheck(ArrayList.java:657) >> at java.util.ArrayList.get(ArrayList.java:433) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207) >> at >> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) >> at >> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> >> >>