Hey Ali,

thanks for sharing the code. I assume that the custom
ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. They
should not be a problem. I think this is a bug in Flink 0.9.1.

Is it possible to re-run your program with the upcoming 0.10.0 (RC8)
version and report back?

1) Add
https://repository.apache.org/content/repositories/orgapacheflink-1055 as a
snapshot repository

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/orgapacheflink-1055
</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

2) Set the Flink dependency version to 0.10.0

3) Use the Flink binary matching your Hadoop installation from here:
http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, you
can go with the Scala 2.10 builds)

Sorry for the inconvenience! The release is about to be finished (the
voting process is already going on).

– Ufuk


On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:

> Thanks for the quick reply guys! A lot of interest in this one. I¹ve
> attached the source code is attached. There are other supporting
> modules/classes but the main flink component is in the included zip file.
>
> In answer to Fabian¹s question: I¹m using the 0.9.1 release right off the
> website (flink-0.9.1-bin-hadoop1.tgz).
>
> In answer to Ufuk¹s question: Yes I¹m using custom data types.
>
> Thanks,
> Ali
>
>
>
> On 2015-11-10, 3:01 PM, "Ufuk Celebi" <u...@apache.org> wrote:
>
> >Thanks for reporting this. Are you using any custom data types?
> >
> >If you can share your code, it would be very helpful in order to debug
> >this.
> >
> >­ Ufuk
> >
> >On Tuesday, 10 November 2015, Fabian Hueske <fhue...@gmail.com> wrote:
> >
> >> I agree with Robert. Looks like a bug in Flink.
> >> Maybe an off-by-one issue (violating index is 32768 and the default
> >>memory
> >> segment size is 32KB).
> >>
> >> Which Flink version are you using?
> >> In case you are using a custom build, can you share the commit ID (is
> >> reported in the first lines of the JobManager log file)?
> >>
> >> Thanks, Fabian
> >>
> >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <rmetz...@apache.org
> >> <javascript:;>>:
> >>
> >> > Hi Ali,
> >> >
> >> > this could be a bug in Flink.
> >> > Can you share the code of your program with us to debug the issue?
> >> >
> >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali <ali.kash...@emc.com
> >> <javascript:;>> wrote:
> >> >
> >> > > Hello,
> >> > >
> >> > > I¹m getting this error while running a streaming module on a cluster
> >> of 3
> >> > > nodes:
> >> > >
> >> > >
> >> > > java.lang.ArrayIndexOutOfBoundsException: 32768
> >> > >
> >> > > at
> >> org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpann
> >>ingRecordDeserializer.java:214)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdapt
> >>iveSpanningRecordDeserializer.java:219)
> >> > >
> >> > > at
> >>org.apache.flink.types.StringValue.readString(StringValue.java:764)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> >>tringSerializer.java:68)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> >>tringSerializer.java:73)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(S
> >>tringSerializer.java:28)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
> >>joSerializer.java:499)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>serialize(StreamRecordSerializer.java:102)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>serialize(StreamRecordSerializer.java:29)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
> >>singDeserializationDelegate.java:57)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeser
> >>ializer.java:110)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe
> >>xtRecord(StreamingAbstractRecordReader.java:80)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S
> >>treamingMutableRecordReader.java:36)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato
> >>r.java:59)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI
> >>nputStreamTask.java:68)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp
> >>utStreamTask.java:101)
> >> > >
> >> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> > >
> >> > > at java.lang.Thread.run(Thread.java:745)
> >> > >
> >> > >
> >> > > Here¹s the configuration for each node:
> >> > >
> >> > >
> >> > > jobmanager.heap.mb: 2048
> >> > >
> >> > > taskmanager.heap.mb: 4096
> >> > >
> >> > > taskmanager.numberOfTaskSlots: 5
> >> > >
> >> > >
> >> > > I¹m not even sure where to start with this one so any help is
> >> > appreciated.
> >> > >
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Ali
> >> > >
> >> >
> >>
>
>

Reply via email to