Hey Ali, thanks for sharing the code. I assume that the custom ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They should not be a problem. I think this is a bug in Flink 0.9.1.
Is it possible to re-run your program with the upcoming 0.10.0 (RC8) version and report back? 1) Add https://repository.apache.org/content/repositories/orgapacheflink-1055 as a snapshot repository <repositories> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/orgapacheflink-1055 </url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> 2) Set the Flink dependency version to 0.10.0 3) Use the Flink binary matching your Hadoop installation from here: http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, you can go with the Scala 2.10 builds) Sorry for the inconvenience! The release is about to be finished (the voting process is already going on). – Ufuk On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > Thanks for the quick reply guys! A lot of interest in this one. I¹ve > attached the source code is attached. There are other supporting > modules/classes but the main flink component is in the included zip file. > > In answer to Fabian¹s question: I¹m using the 0.9.1 release right off the > website (flink-0.9.1-bin-hadoop1.tgz). > > In answer to Ufuk¹s question: Yes I¹m using custom data types. > > Thanks, > Ali > > > > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <u...@apache.org> wrote: > > >Thanks for reporting this. Are you using any custom data types? > > > >If you can share your code, it would be very helpful in order to debug > >this. > > > > Ufuk > > > >On Tuesday, 10 November 2015, Fabian Hueske <fhue...@gmail.com> wrote: > > > >> I agree with Robert. Looks like a bug in Flink. > >> Maybe an off-by-one issue (violating index is 32768 and the default > >>memory > >> segment size is 32KB). > >> > >> Which Flink version are you using? > >> In case you are using a custom build, can you share the commit ID (is > >> reported in the first lines of the JobManager log file)? > >> > >> Thanks, Fabian > >> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <rmetz...@apache.org > >> <javascript:;>>: > >> > >> > Hi Ali, > >> > > >> > this could be a bug in Flink. > >> > Can you share the code of your program with us to debug the issue? > >> > > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <ali.kash...@emc.com > >> <javascript:;>> wrote: > >> > > >> > > Hello, > >> > > > >> > > I¹m getting this error while running a streaming module on a cluster > >> of 3 > >> > > nodes: > >> > > > >> > > > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 > >> > > > >> > > at > >> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann > >>ingRecordDeserializer.java:214) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapt > >>iveSpanningRecordDeserializer.java:219) > >> > > > >> > > at > >>org.apache.flink.types.StringValue.readString(StringValue.java:764) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > >>tringSerializer.java:68) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > >>tringSerializer.java:73) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S > >>tringSerializer.java:28) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > >>joSerializer.java:499) > >> > > > >> > > at > >> > > > >> > > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>serialize(StreamRecordSerializer.java:102) > >> > > > >> > > at > >> > > > >> > > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>serialize(StreamRecordSerializer.java:29) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > >>singDeserializationDelegate.java:57) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeser > >>ializer.java:110) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > >>xtRecord(StreamingAbstractRecordReader.java:80) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > >>treamingMutableRecordReader.java:36) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > >>r.java:59) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > >>nputStreamTask.java:68) > >> > > > >> > > at > >> > > > >> > > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > >>utStreamTask.java:101) > >> > > > >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> > > > >> > > at java.lang.Thread.run(Thread.java:745) > >> > > > >> > > > >> > > Here¹s the configuration for each node: > >> > > > >> > > > >> > > jobmanager.heap.mb: 2048 > >> > > > >> > > taskmanager.heap.mb: 4096 > >> > > > >> > > taskmanager.numberOfTaskSlots: 5 > >> > > > >> > > > >> > > I¹m not even sure where to start with this one so any help is > >> > appreciated. > >> > > > >> > > > >> > > Thanks, > >> > > > >> > > Ali > >> > > > >> > > >> > >