Ah, no problem. Glad you could resolve your problem :-) Thanks for reporting back.
Cheers, Fabian 2015-11-12 17:42 GMT+01:00 Kashmar, Ali <ali.kash...@emc.com>: > So the problem wasn’t in Flink after all. It turns out the data I was > receiving at the socket was not complete. So I went back and looked at the > way I’m sending data to the socket and realized that the socket is closed > before sending all data. I just needed to flush the stream before closing > the socket. I don’t see any more serialization errors. > > Thanks everyone for the help and I apologize if I wasted your time with > this. I will stick with 0.9.1 for now but I’ll download and use 0.10 as > soon as it’s released. > > Cheers, > Ali > > On 2015-11-11, 6:00 PM, "Fabian Hueske" <fhue...@gmail.com> wrote: > > >Hi Ali, > > > >Flink uses different serializers for different data types. For example, > >(boxed) primitives are serialized using dedicated serializers > >(IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is > >recognized as a Pojo type and therefore serialized using Flink's > >PojoSerializer. > >Types that cannot be (fully) analyzed are handled as GenericTypes and > >serialized using Flink's KryoSerializer. > > > >By forcing Kryo serialization as I suggested before, Pojo types (such as > >ProtocolEvent) will be serialized with Kryo instead of Flink's > >PojoSerializer. > >Hence, forcing Kryo only affects Pojo types. GenericTypes (such as > >ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo > >(also without forcing it). > > > >The exceptions you are facing might be caused by a bug in the > >KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug > >basically corrupts the stream of serialized data and might very well also > >be responsible for the original exception you posted. As you see from the > >JIRA issue, a bug fix was merged to all active branches however it is not > >yet contained in an official release. > > > >I would recommend you to try the latest candidate of the upcoming 0.10 > >release [2] or build Flink from the 0.9-release branch [3]. > > > >Please let me know if you have any questions or still facing problems when > >switching to version with a fix for FLINK-2800. > > > >Best, Fabian > > > >[1] https://issues.apache.org/jira/browse/FLINK-2800 > >[2] http://people.apache.org/~mxm/flink-0.10.0-rc8/ > >[3] https://github.com/apache/flink/tree/release-0.9 > > > >2015-11-11 17:20 GMT+01:00 Kashmar, Ali <ali.kash...@emc.com>: > > > >> Fabian, > >> > >> I tried running it again and I noticed there were some more exceptions > >>in > >> the log. I fixed those and I don’t see the original error but I do see > >> other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I > >>didn’t > >> even enable that yet like you suggested). Examples: > >> > >> 1) > >> > >> 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput > >> - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: > >>255 > >> at > >> > >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectI > >>nt > >> Map.java:364) > >> at > >> > >>com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceRes > >>ol > >> ver.java:47) > >> at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) > >> at > >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j > >>av > >> a:95) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j > >>av > >> a:21) > >> at > >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize > >>(K > >> ryoSerializer.java:186) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo > >>Se > >> rializer.java:372) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se > >>ri > >> alize(StreamRecordSerializer.java:89) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se > >>ri > >> alize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati > >>on > >> Delegate.java:51) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria > >>li > >> zer.addRecord(SpanningRecordSerializer.java:76) > >> at > >> > >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr > >>it > >> er.java:83) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor > >>dW > >> riter.java:58) > >> at > >> > >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu > >>t. > >> java:62) > >> at > >> > >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect > >>or > >> Wrapper.java:40) > >> at > >> > >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou > >>rc > >> e.java:40) > >> at > >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) > >> at > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> at > >> > >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java > >>:1 > >> 142) > >> at > >> > >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav > >>a: > >> 617) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> > >> > >> 2) > >> 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput > >> - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: > >>334 > >> at > >> > >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIn > >>tM > >> ap.java:207) > >> at > >> > >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectInt > >>Ma > >> p.java:117) > >> at > >> > >>com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapR > >>ef > >> erenceResolver.java:23) > >> at > >> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629) > >> at > >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j > >>av > >> a:88) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j > >>av > >> a:21) > >> at > >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize > >>(K > >> ryoSerializer.java:186) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo > >>Se > >> rializer.java:372) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se > >>ri > >> alize(StreamRecordSerializer.java:89) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se > >>ri > >> alize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati > >>on > >> Delegate.java:51) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria > >>li > >> zer.addRecord(SpanningRecordSerializer.java:76) > >> at > >> > >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr > >>it > >> er.java:83) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor > >>dW > >> riter.java:58) > >> at > >> > >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu > >>t. > >> java:62) > >> at > >> > >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect > >>or > >> Wrapper.java:40) > >> at > >> > >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou > >>rc > >> e.java:40) > >> at > >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) > >> at > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> at > >> > >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java > >>:1 > >> 142) > >> at > >> > >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav > >>a: > >> 617) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> 3) > >> > >> com.esotericsoftware.kryo.KryoException: Encountered unregistered class > >> ID: 106 > >> at > >> > >>com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClas > >>sR > >> esolver.java:119) > >> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :135) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :21) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:211) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:225) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > >>jo > >> Serializer.java:499) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:102) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > >>si > >> ngDeserializationDelegate.java:57) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nn > >> > >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria > >>li > >> zer.java:110) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > >>xt > >> Record(StreamingAbstractRecordReader.java:80) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > >>tr > >> eamingMutableRecordReader.java:36) > >> at > >> > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > >>r. > >> java:59) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > >>np > >> utStreamTask.java:68) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > >>ut > >> StreamTask.java:101) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> 4) > >> > >> java.lang.IllegalArgumentException: You can store only Strings, Integer > >> and Longs in the ProtocolDetailMap, not: 'false' for 'null' > >> at > >> > >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100 > >>) > >> at > >> > >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :144) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :21) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:211) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:225) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > >>jo > >> Serializer.java:499) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:102) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > >>si > >> ngDeserializationDelegate.java:57) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nn > >> > >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria > >>li > >> zer.java:110) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > >>xt > >> Record(StreamingAbstractRecordReader.java:80) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > >>tr > >> eamingMutableRecordReader.java:36) > >> at > >> > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > >>r. > >> java:59) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > >>np > >> utStreamTask.java:68) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > >>ut > >> StreamTask.java:101) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> 5) > >> > >> > >> java.lang.IndexOutOfBoundsException: Index: 85, Size: 9 > >> at java.util.ArrayList.rangeCheck(ArrayList.java:653) > >> at java.util.ArrayList.get(ArrayList.java:429) > >> at > >> > >>com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefe > >>re > >> nceResolver.java:42) > >> at > >> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :135) > >> at > >> > >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja > >>va > >> :21) > >> at > >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:211) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali > >>ze > >> (KryoSerializer.java:225) > >> at > >> > >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po > >>jo > >> Serializer.java:499) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:102) > >> at > >> > >> > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de > >>se > >> rialize(StreamRecordSerializer.java:29) > >> at > >> > >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu > >>si > >> ngDeserializationDelegate.java:57) > >> at > >> > >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa > >>nn > >> > >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria > >>li > >> zer.java:110) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe > >>xt > >> Record(StreamingAbstractRecordReader.java:80) > >> at > >> > >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S > >>tr > >> eamingMutableRecordReader.java:36) > >> at > >> > >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato > >>r. > >> java:59) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI > >>np > >> utStreamTask.java:68) > >> at > >> > >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp > >>ut > >> StreamTask.java:101) > >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> > >> > >> As you can tell, there’s a common theme there which is the MapSerializer > >> in Kryo. The source of my grief seems to be the two java.util.Map > >> implementations ProtocolAttributeMap and ProtocolDetailMap. They’re > >>custom > >> implementations and they’re anal about what types of objects you can > >>have > >> as values. > >> > >> > >> Here’s the output of the gist you asked me to run: > >> > >> class org.apache.flink.api.java.typeutils.PojoTypeInfo : class > >> io.pivotal.rti.protocols.ProtocolEvent > >> ( > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Long > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Long > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Long > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class > >> io.pivotal.rti.protocols.ProtocolAttributeMap > >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class > >> io.pivotal.rti.protocols.ProtocolDetailMap > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Short > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class > >> java.lang.Short > >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class > >> java.lang.String > >> ) > >> > >> > >> Right now, I’m using the gson library to convert the ProtocolEvent > >> instance to JSON and back. I think I have to write a custom converter to > >> make ProtocolEvent a proper POJO in order for it to work with the > >> serializers in Flink. > >> > >> Sorry for the long email. > >> > >> Thanks, > >> Ali > >> > >> > >> On 2015-11-11, 10:02 AM, "Fabian Hueske" <fhue...@gmail.com> wrote: > >> > >> >Hi Ali, > >> > > >> >I looked into this issue. This problem seems to be caused because the > >> >deserializer reads more data than it should read. > >> >This might happen because of two reasons: > >> > 1) the meta information of how much data is safe to read is > >>incorrect. > >> > 2) the serializer and deserializer logic are not in sync which can > >>cause > >> >the deserializer to read more data than the serializer wrote. > >> > > >> >The first case is less likely: Flink writes the binary length of each > >> >record in front of its serialized representation. This happens whenever > >> >data is sent over the network, regardless of the data type. A bug in > >>this > >> >part would be very crucial, but is also less likely because this > >>happens > >> >very often and has not occurred yet. > >> > > >> >IMO, this looks like an issue of the serialization logic. Looking at > >>your > >> >code, the problem occurs when deserializing ProtocolEvent objects. > >> >Is it possible that you share this class with me? > >> > > >> >If it is not possible to share the class, it would be good, to know the > >> >field types of the Pojo and the associated TypeInformation. > >> >For that you can run the code in this gist [1] which will recursively > >> >print > >> >the field types and their TypeInformation. > >> > > >> >As a temporal workaround, you can try to use Kryo to serialize and > >> >deserialize your Pojos as follows: > >> >ExecutionEnvironment env = ... > >> >env.getConfig().enableForceKryo(); > >> > > >> >Best, > >> >Fabian > >> > > >> >[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b > >> > > >> >2015-11-11 10:38 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > >> > > >> >> Hi Ali, > >> >> > >> >> one more thing. Did that error occur once or is it reproducable? > >> >> > >> >> Thanks for your help, > >> >> Fabian > >> >> > >> >> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <u...@apache.org>: > >> >> > >> >>> Hey Ali, > >> >>> > >> >>> thanks for sharing the code. I assume that the custom > >> >>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. > >>They > >> >>> should not be a problem. I think this is a bug in Flink 0.9.1. > >> >>> > >> >>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8) > >> >>> version and report back? > >> >>> > >> >>> 1) Add > >> >>> > >>https://repository.apache.org/content/repositories/orgapacheflink-1055 > >> >>> as a > >> >>> snapshot repository > >> >>> > >> >>> <repositories> > >> >>> <repository> > >> >>> <id>apache.snapshots</id> > >> >>> <name>Apache Development Snapshot Repository</name> > >> >>> <url> > >> >>> > >>https://repository.apache.org/content/repositories/orgapacheflink-1055 > >> >>> </url> > >> >>> <releases> > >> >>> <enabled>false</enabled> > >> >>> </releases> > >> >>> <snapshots> > >> >>> <enabled>true</enabled> > >> >>> </snapshots> > >> >>> </repository> > >> >>> </repositories> > >> >>> > >> >>> 2) Set the Flink dependency version to 0.10.0 > >> >>> > >> >>> 3) Use the Flink binary matching your Hadoop installation from here: > >> >>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use > Java, > >> >>>you > >> >>> can go with the Scala 2.10 builds) > >> >>> > >> >>> Sorry for the inconvenience! The release is about to be finished > >>(the > >> >>> voting process is already going on). > >> >>> > >> >>> Ufuk > >> >>> > >> >>> > >> >>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <ali.kash...@emc.com> > >> >>> wrote: > >> >>> > >> >>> > Thanks for the quick reply guys! A lot of interest in this one. > >>I¹ve > >> >>> > attached the source code is attached. There are other supporting > >> >>> > modules/classes but the main flink component is in the included > >>zip > >> >>> file. > >> >>> > > >> >>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right > >>off > >> >>> the > >> >>> > website (flink-0.9.1-bin-hadoop1.tgz). > >> >>> > > >> >>> > In answer to Ufuk¹s question: Yes I¹m using custom data types. > >> >>> > > >> >>> > Thanks, > >> >>> > Ali > >> >>> > > >> >>> > > >> >>> > > >> >>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <u...@apache.org> wrote: > >> >>> > > >> >>> > >Thanks for reporting this. Are you using any custom data types? > >> >>> > > > >> >>> > >If you can share your code, it would be very helpful in order to > >> >>>debug > >> >>> > >this. > >> >>> > > > >> >>> > > Ufuk > >> >>> > > > >> >>> > >On Tuesday, 10 November 2015, Fabian Hueske <fhue...@gmail.com> > >> >>>wrote: > >> >>> > > > >> >>> > >> I agree with Robert. Looks like a bug in Flink. > >> >>> > >> Maybe an off-by-one issue (violating index is 32768 and the > >> >>>default > >> >>> > >>memory > >> >>> > >> segment size is 32KB). > >> >>> > >> > >> >>> > >> Which Flink version are you using? > >> >>> > >> In case you are using a custom build, can you share the commit > >>ID > >> >>>(is > >> >>> > >> reported in the first lines of the JobManager log file)? > >> >>> > >> > >> >>> > >> Thanks, Fabian > >> >>> > >> > >> >>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <rmetz...@apache.org > >> >>> > >> <javascript:;>>: > >> >>> > >> > >> >>> > >> > Hi Ali, > >> >>> > >> > > >> >>> > >> > this could be a bug in Flink. > >> >>> > >> > Can you share the code of your program with us to debug the > >> >>>issue? > >> >>> > >> > > >> >>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali > >> >>><ali.kash...@emc.com > >> >>> > >> <javascript:;>> wrote: > >> >>> > >> > > >> >>> > >> > > Hello, > >> >>> > >> > > > >> >>> > >> > > I¹m getting this error while running a streaming module on > >>a > >> >>> cluster > >> >>> > >> of 3 > >> >>> > >> > > nodes: > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > >> >>> > >>org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti > >>>>>>>ve > >> >>>>>Spa > >> >>> > > >> >>> > >> > >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptive > >>>>>>>Sp > >> >>>>>ann > >> >>> > >>ingRecordDeserializer.java:214) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti > >>>>>>>ve > >> >>>>>Spa > >> >>> > > >> >>> > >> > >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(Spilling > >>>>>>>Ad > >> >>>>>apt > >> >>> > >>iveSpanningRecordDeserializer.java:219) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > > >>>>org.apache.flink.types.StringValue.readString(StringValue.java:764) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > >>>>>>>iz > >> >>>>>e(S > >> >>> > >>tringSerializer.java:68) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > >>>>>>>iz > >> >>>>>e(S > >> >>> > >>tringSerializer.java:73) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > >>>>>>>iz > >> >>>>>e(S > >> >>> > >>tringSerializer.java:28) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deseriali > >>>>>>>ze > >> >>>>>(Po > >> >>> > >>joSerializer.java:499) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer > >>>>>.d > >> >>>e > >> >>> > >>serialize(StreamRecordSerializer.java:102) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer > >>>>>.d > >> >>>e > >> >>> > >>serialize(StreamRecordSerializer.java:29) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea > >>>>>>>d( > >> >>>>>Reu > >> >>> > >>singDeserializationDelegate.java:57) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti > >>>>>>>ve > >> >>>>>Spa > >> >>> > > >> >>> > >> > >>>>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecord > >>>>>>>De > >> >>>>>ser > >> >>> > >>ializer.java:110) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> >>>>> > >> org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge > >> >>>>>tNe > >> >>> > >>xtRecord(StreamingAbstractRecordReader.java:80) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.n > >>>>>>>ex > >> >>>>>t(S > >> >>> > >>treamingMutableRecordReader.java:36) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt > >>>>>>>er > >> >>>>>ato > >> >>> > >>r.java:59) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext > >>>>>>>(O > >> >>>>>neI > >> >>> > >>nputStreamTask.java:68) > >> >>> > >> > > > >> >>> > >> > > at > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > >> > >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(O > >>>>>>>ne > >> >>>>>Inp > >> >>> > >>utStreamTask.java:101) > >> >>> > >> > > > >> >>> > >> > > at > >> >>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >> >>> > >> > > > >> >>> > >> > > at java.lang.Thread.run(Thread.java:745) > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > Here¹s the configuration for each node: > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > jobmanager.heap.mb: 2048 > >> >>> > >> > > > >> >>> > >> > > taskmanager.heap.mb: 4096 > >> >>> > >> > > > >> >>> > >> > > taskmanager.numberOfTaskSlots: 5 > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > I¹m not even sure where to start with this one so any help > >>is > >> >>> > >> > appreciated. > >> >>> > >> > > > >> >>> > >> > > > >> >>> > >> > > Thanks, > >> >>> > >> > > > >> >>> > >> > > Ali > >> >>> > >> > > > >> >>> > >> > > >> >>> > >> > >> >>> > > >> >>> > > >> >>> > >> >> > >> >> > >> > >> > >