So the problem wasn’t in Flink after all. It turns out the data I was receiving at the socket was not complete. So I went back and looked at the way I’m sending data to the socket and realized that the socket is closed before sending all data. I just needed to flush the stream before closing the socket. I don’t see any more serialization errors.
Thanks everyone for the help and I apologize if I wasted your time with this. I will stick with 0.9.1 for now but I’ll download and use 0.10 as soon as it’s released. Cheers, Ali On 2015-11-11, 6:00 PM, "Fabian Hueske" <fhue...@gmail.com> wrote: >Hi Ali, > >Flink uses different serializers for different data types. For example, >(boxed) primitives are serialized using dedicated serializers >(IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is >recognized as a Pojo type and therefore serialized using Flink's >PojoSerializer. >Types that cannot be (fully) analyzed are handled as GenericTypes and >serialized using Flink's KryoSerializer. > >By forcing Kryo serialization as I suggested before, Pojo types (such as >ProtocolEvent) will be serialized with Kryo instead of Flink's >PojoSerializer. >Hence, forcing Kryo only affects Pojo types. GenericTypes (such as >ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo >(also without forcing it). > >The exceptions you are facing might be caused by a bug in the >KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug >basically corrupts the stream of serialized data and might very well also >be responsible for the original exception you posted. As you see from the >JIRA issue, a bug fix was merged to all active branches however it is not >yet contained in an official release. > >I would recommend you to try the latest candidate of the upcoming 0.10 >release [2] or build Flink from the 0.9-release branch [3]. > >Please let me know if you have any questions or still facing problems when >switching to version with a fix for FLINK-2800. > >Best, Fabian > >[1] https://issues.apache.org/jira/browse/FLINK-2800 >[2] http://people.apache.org/~mxm/flink-0.10.0-rc8/ >[3] https://github.com/apache/flink/tree/release-0.9 > >2015-11-11 17:20 GMT+01:00 Kashmar, Ali <ali.kash...@emc.com>: > >> Fabian, >> >> I tried running it again and I noticed there were some more exceptions >>in >> the log. I fixed those and I don’t see the original error but I do see >> other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I >>didn’t >> even enable that yet like you suggested). Examples: >> >> 1) >> >> 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput >> - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: >>255 >> at >> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectI >>nt >> Map.java:364) >> at >> >>com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceRes >>ol >> ver.java:47) >> at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j >>av >> a:95) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j >>av >> a:21) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize >>(K >> ryoSerializer.java:186) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo >>Se >> rializer.java:372) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se >>ri >> alize(StreamRecordSerializer.java:89) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se >>ri >> alize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati >>on >> Delegate.java:51) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria >>li >> zer.addRecord(SpanningRecordSerializer.java:76) >> at >> >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr >>it >> er.java:83) >> at >> >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor >>dW >> riter.java:58) >> at >> >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu >>t. >> java:62) >> at >> >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect >>or >> Wrapper.java:40) >> at >> >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou >>rc >> e.java:40) >> at >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java >>:1 >> 142) >> at >> >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav >>a: >> 617) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> 2) >> 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput >> - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException: >>334 >> at >> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIn >>tM >> ap.java:207) >> at >> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectInt >>Ma >> p.java:117) >> at >> >>com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapR >>ef >> erenceResolver.java:23) >> at >> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j >>av >> a:88) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j >>av >> a:21) >> at >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize >>(K >> ryoSerializer.java:186) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo >>Se >> rializer.java:372) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se >>ri >> alize(StreamRecordSerializer.java:89) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se >>ri >> alize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati >>on >> Delegate.java:51) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria >>li >> zer.addRecord(SpanningRecordSerializer.java:76) >> at >> >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr >>it >> er.java:83) >> at >> >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor >>dW >> riter.java:58) >> at >> >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu >>t. >> java:62) >> at >> >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect >>or >> Wrapper.java:40) >> at >> >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou >>rc >> e.java:40) >> at >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java >>:1 >> 142) >> at >> >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav >>a: >> 617) >> at java.lang.Thread.run(Thread.java:745) >> >> 3) >> >> com.esotericsoftware.kryo.KryoException: Encountered unregistered class >> ID: 106 >> at >> >>com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClas >>sR >> esolver.java:119) >> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :135) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :21) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:211) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:225) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po >>jo >> Serializer.java:499) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:102) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu >>si >> ngDeserializationDelegate.java:57) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nn >> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria >>li >> zer.java:110) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe >>xt >> Record(StreamingAbstractRecordReader.java:80) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S >>tr >> eamingMutableRecordReader.java:36) >> at >> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato >>r. >> java:59) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI >>np >> utStreamTask.java:68) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp >>ut >> StreamTask.java:101) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> 4) >> >> java.lang.IllegalArgumentException: You can store only Strings, Integer >> and Longs in the ProtocolDetailMap, not: 'false' for 'null' >> at >> >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100 >>) >> at >> >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :144) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :21) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:211) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:225) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po >>jo >> Serializer.java:499) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:102) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu >>si >> ngDeserializationDelegate.java:57) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nn >> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria >>li >> zer.java:110) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe >>xt >> Record(StreamingAbstractRecordReader.java:80) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S >>tr >> eamingMutableRecordReader.java:36) >> at >> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato >>r. >> java:59) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI >>np >> utStreamTask.java:68) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp >>ut >> StreamTask.java:101) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> 5) >> >> >> java.lang.IndexOutOfBoundsException: Index: 85, Size: 9 >> at java.util.ArrayList.rangeCheck(ArrayList.java:653) >> at java.util.ArrayList.get(ArrayList.java:429) >> at >> >>com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefe >>re >> nceResolver.java:42) >> at >> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :135) >> at >> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja >>va >> :21) >> at >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:211) >> at >> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali >>ze >> (KryoSerializer.java:225) >> at >> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po >>jo >> Serializer.java:499) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:102) >> at >> >>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de >>se >> rialize(StreamRecordSerializer.java:29) >> at >> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu >>si >> ngDeserializationDelegate.java:57) >> at >> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa >>nn >> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria >>li >> zer.java:110) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe >>xt >> Record(StreamingAbstractRecordReader.java:80) >> at >> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S >>tr >> eamingMutableRecordReader.java:36) >> at >> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato >>r. >> java:59) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI >>np >> utStreamTask.java:68) >> at >> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp >>ut >> StreamTask.java:101) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> As you can tell, there’s a common theme there which is the MapSerializer >> in Kryo. The source of my grief seems to be the two java.util.Map >> implementations ProtocolAttributeMap and ProtocolDetailMap. They’re >>custom >> implementations and they’re anal about what types of objects you can >>have >> as values. >> >> >> Here’s the output of the gist you asked me to run: >> >> class org.apache.flink.api.java.typeutils.PojoTypeInfo : class >> io.pivotal.rti.protocols.ProtocolEvent >> ( >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Long >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Long >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Long >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class >> io.pivotal.rti.protocols.ProtocolAttributeMap >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class >> io.pivotal.rti.protocols.ProtocolDetailMap >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Short >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class >> java.lang.Short >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class >> java.lang.String >> ) >> >> >> Right now, I’m using the gson library to convert the ProtocolEvent >> instance to JSON and back. I think I have to write a custom converter to >> make ProtocolEvent a proper POJO in order for it to work with the >> serializers in Flink. >> >> Sorry for the long email. >> >> Thanks, >> Ali >> >> >> On 2015-11-11, 10:02 AM, "Fabian Hueske" <fhue...@gmail.com> wrote: >> >> >Hi Ali, >> > >> >I looked into this issue. This problem seems to be caused because the >> >deserializer reads more data than it should read. >> >This might happen because of two reasons: >> > 1) the meta information of how much data is safe to read is >>incorrect. >> > 2) the serializer and deserializer logic are not in sync which can >>cause >> >the deserializer to read more data than the serializer wrote. >> > >> >The first case is less likely: Flink writes the binary length of each >> >record in front of its serialized representation. This happens whenever >> >data is sent over the network, regardless of the data type. A bug in >>this >> >part would be very crucial, but is also less likely because this >>happens >> >very often and has not occurred yet. >> > >> >IMO, this looks like an issue of the serialization logic. Looking at >>your >> >code, the problem occurs when deserializing ProtocolEvent objects. >> >Is it possible that you share this class with me? >> > >> >If it is not possible to share the class, it would be good, to know the >> >field types of the Pojo and the associated TypeInformation. >> >For that you can run the code in this gist [1] which will recursively >> >print >> >the field types and their TypeInformation. >> > >> >As a temporal workaround, you can try to use Kryo to serialize and >> >deserialize your Pojos as follows: >> >ExecutionEnvironment env = ... >> >env.getConfig().enableForceKryo(); >> > >> >Best, >> >Fabian >> > >> >[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b >> > >> >2015-11-11 10:38 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: >> > >> >> Hi Ali, >> >> >> >> one more thing. Did that error occur once or is it reproducable? >> >> >> >> Thanks for your help, >> >> Fabian >> >> >> >> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <u...@apache.org>: >> >> >> >>> Hey Ali, >> >>> >> >>> thanks for sharing the code. I assume that the custom >> >>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos. >>They >> >>> should not be a problem. I think this is a bug in Flink 0.9.1. >> >>> >> >>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8) >> >>> version and report back? >> >>> >> >>> 1) Add >> >>> >>https://repository.apache.org/content/repositories/orgapacheflink-1055 >> >>> as a >> >>> snapshot repository >> >>> >> >>> <repositories> >> >>> <repository> >> >>> <id>apache.snapshots</id> >> >>> <name>Apache Development Snapshot Repository</name> >> >>> <url> >> >>> >>https://repository.apache.org/content/repositories/orgapacheflink-1055 >> >>> </url> >> >>> <releases> >> >>> <enabled>false</enabled> >> >>> </releases> >> >>> <snapshots> >> >>> <enabled>true</enabled> >> >>> </snapshots> >> >>> </repository> >> >>> </repositories> >> >>> >> >>> 2) Set the Flink dependency version to 0.10.0 >> >>> >> >>> 3) Use the Flink binary matching your Hadoop installation from here: >> >>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java, >> >>>you >> >>> can go with the Scala 2.10 builds) >> >>> >> >>> Sorry for the inconvenience! The release is about to be finished >>(the >> >>> voting process is already going on). >> >>> >> >>> Ufuk >> >>> >> >>> >> >>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <ali.kash...@emc.com> >> >>> wrote: >> >>> >> >>> > Thanks for the quick reply guys! A lot of interest in this one. >>I¹ve >> >>> > attached the source code is attached. There are other supporting >> >>> > modules/classes but the main flink component is in the included >>zip >> >>> file. >> >>> > >> >>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right >>off >> >>> the >> >>> > website (flink-0.9.1-bin-hadoop1.tgz). >> >>> > >> >>> > In answer to Ufuk¹s question: Yes I¹m using custom data types. >> >>> > >> >>> > Thanks, >> >>> > Ali >> >>> > >> >>> > >> >>> > >> >>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <u...@apache.org> wrote: >> >>> > >> >>> > >Thanks for reporting this. Are you using any custom data types? >> >>> > > >> >>> > >If you can share your code, it would be very helpful in order to >> >>>debug >> >>> > >this. >> >>> > > >> >>> > > Ufuk >> >>> > > >> >>> > >On Tuesday, 10 November 2015, Fabian Hueske <fhue...@gmail.com> >> >>>wrote: >> >>> > > >> >>> > >> I agree with Robert. Looks like a bug in Flink. >> >>> > >> Maybe an off-by-one issue (violating index is 32768 and the >> >>>default >> >>> > >>memory >> >>> > >> segment size is 32KB). >> >>> > >> >> >>> > >> Which Flink version are you using? >> >>> > >> In case you are using a custom build, can you share the commit >>ID >> >>>(is >> >>> > >> reported in the first lines of the JobManager log file)? >> >>> > >> >> >>> > >> Thanks, Fabian >> >>> > >> >> >>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <rmetz...@apache.org >> >>> > >> <javascript:;>>: >> >>> > >> >> >>> > >> > Hi Ali, >> >>> > >> > >> >>> > >> > this could be a bug in Flink. >> >>> > >> > Can you share the code of your program with us to debug the >> >>>issue? >> >>> > >> > >> >>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali >> >>><ali.kash...@emc.com >> >>> > >> <javascript:;>> wrote: >> >>> > >> > >> >>> > >> > > Hello, >> >>> > >> > > >> >>> > >> > > I¹m getting this error while running a streaming module on >>a >> >>> cluster >> >>> > >> of 3 >> >>> > >> > > nodes: >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768 >> >>> > >> > > >> >>> > >> > > at >> >>> > >> >> >>> >>org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti >>>>>>>ve >> >>>>>Spa >> >>> > >> >>> >> >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptive >>>>>>>Sp >> >>>>>ann >> >>> > >>ingRecordDeserializer.java:214) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti >>>>>>>ve >> >>>>>Spa >> >>> > >> >>> >> >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(Spilling >>>>>>>Ad >> >>>>>apt >> >>> > >>iveSpanningRecordDeserializer.java:219) >> >>> > >> > > >> >>> > >> > > at >> >>> > >>>>org.apache.flink.types.StringValue.readString(StringValue.java:764) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial >>>>>>>iz >> >>>>>e(S >> >>> > >>tringSerializer.java:68) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial >>>>>>>iz >> >>>>>e(S >> >>> > >>tringSerializer.java:73) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial >>>>>>>iz >> >>>>>e(S >> >>> > >>tringSerializer.java:28) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deseriali >>>>>>>ze >> >>>>>(Po >> >>> > >>joSerializer.java:499) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >> >>> > >> >>> >> >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer >>>>>.d >> >>>e >> >>> > >>serialize(StreamRecordSerializer.java:102) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >> >>> > >> >>> >> >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer >>>>>.d >> >>>e >> >>> > >>serialize(StreamRecordSerializer.java:29) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea >>>>>>>d( >> >>>>>Reu >> >>> > >>singDeserializationDelegate.java:57) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti >>>>>>>ve >> >>>>>Spa >> >>> > >> >>> >> >>>>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecord >>>>>>>De >> >>>>>ser >> >>> > >>ializer.java:110) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>> >> org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge >> >>>>>tNe >> >>> > >>xtRecord(StreamingAbstractRecordReader.java:80) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.n >>>>>>>ex >> >>>>>t(S >> >>> > >>treamingMutableRecordReader.java:36) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt >>>>>>>er >> >>>>>ato >> >>> > >>r.java:59) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext >>>>>>>(O >> >>>>>neI >> >>> > >>nputStreamTask.java:68) >> >>> > >> > > >> >>> > >> > > at >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> >> >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(O >>>>>>>ne >> >>>>>Inp >> >>> > >>utStreamTask.java:101) >> >>> > >> > > >> >>> > >> > > at >> >>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> >>> > >> > > >> >>> > >> > > at java.lang.Thread.run(Thread.java:745) >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > Here¹s the configuration for each node: >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > jobmanager.heap.mb: 2048 >> >>> > >> > > >> >>> > >> > > taskmanager.heap.mb: 4096 >> >>> > >> > > >> >>> > >> > > taskmanager.numberOfTaskSlots: 5 >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > I¹m not even sure where to start with this one so any help >>is >> >>> > >> > appreciated. >> >>> > >> > > >> >>> > >> > > >> >>> > >> > > Thanks, >> >>> > >> > > >> >>> > >> > > Ali >> >>> > >> > > >> >>> > >> > >> >>> > >> >> >>> > >> >>> > >> >>> >> >> >> >> >> >>