So the problem wasn’t in Flink after all. It turns out the data I was
receiving at the socket was not complete. So I went back and looked at the
way I’m sending data to the socket and realized that the socket is closed
before sending all data. I just needed to flush the stream before closing
the socket. I don’t see any more serialization errors.

Thanks everyone for the help and I apologize if I wasted your time with
this. I will stick with 0.9.1 for now but I’ll download and use 0.10 as
soon as it’s released.

Cheers,
Ali

On 2015-11-11, 6:00 PM, "Fabian Hueske" <fhue...@gmail.com> wrote:

>Hi Ali,
>
>Flink uses different serializers for different data types. For example,
>(boxed) primitives are serialized using dedicated serializers
>(IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is
>recognized as a Pojo type and therefore serialized using Flink's
>PojoSerializer.
>Types that cannot be (fully) analyzed are handled as GenericTypes and
>serialized using Flink's KryoSerializer.
>
>By forcing Kryo serialization as I suggested before, Pojo types (such as
>ProtocolEvent) will be serialized with Kryo instead of Flink's
>PojoSerializer.
>Hence, forcing Kryo only affects Pojo types. GenericTypes (such as
>ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo
>(also without forcing it).
>
>The exceptions you are facing might be caused by a bug in the
>KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug
>basically corrupts the stream of serialized data and might very well also
>be responsible for the original exception you posted. As you see from the
>JIRA issue, a bug fix was merged to all active branches however it is not
>yet contained in an official release.
>
>I would recommend you to try the latest candidate of the upcoming 0.10
>release [2] or build Flink from the 0.9-release branch [3].
>
>Please let me know if you have any questions or still facing problems when
>switching to version with a fix for FLINK-2800.
>
>Best, Fabian
>
>[1] https://issues.apache.org/jira/browse/FLINK-2800
>[2] http://people.apache.org/~mxm/flink-0.10.0-rc8/
>[3] https://github.com/apache/flink/tree/release-0.9
>
>2015-11-11 17:20 GMT+01:00 Kashmar, Ali <ali.kash...@emc.com>:
>
>> Fabian,
>>
>> I tried running it again and I noticed there were some more exceptions
>>in
>> the log. I fixed those and I don’t see the original error but I do see
>> other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I
>>didn’t
>> even enable that yet like you suggested). Examples:
>>
>> 1)
>>
>> 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput
>>       - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException:
>>255
>>         at
>> 
>>com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectI
>>nt
>> Map.java:364)
>>         at
>> 
>>com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceRes
>>ol
>> ver.java:47)
>>         at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
>>av
>> a:95)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
>>av
>> a:21)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize
>>(K
>> ryoSerializer.java:186)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo
>>Se
>> rializer.java:372)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
>>ri
>> alize(StreamRecordSerializer.java:89)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
>>ri
>> alize(StreamRecordSerializer.java:29)
>>         at
>> 
>>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati
>>on
>> Delegate.java:51)
>>         at
>> 
>>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria
>>li
>> zer.addRecord(SpanningRecordSerializer.java:76)
>>         at
>> 
>>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr
>>it
>> er.java:83)
>>         at
>> 
>>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor
>>dW
>> riter.java:58)
>>         at
>> 
>>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu
>>t.
>> java:62)
>>         at
>> 
>>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect
>>or
>> Wrapper.java:40)
>>         at
>> 
>>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou
>>rc
>> e.java:40)
>>         at
>> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> 
>>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java
>>:1
>> 142)
>>         at
>> 
>>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav
>>a:
>> 617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> 2)
>> 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput
>>       - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException:
>>334
>>         at
>> 
>>com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIn
>>tM
>> ap.java:207)
>>         at
>> 
>>com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectInt
>>Ma
>> p.java:117)
>>         at
>> 
>>com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapR
>>ef
>> erenceResolver.java:23)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
>>av
>> a:88)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
>>av
>> a:21)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize
>>(K
>> ryoSerializer.java:186)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo
>>Se
>> rializer.java:372)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
>>ri
>> alize(StreamRecordSerializer.java:89)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
>>ri
>> alize(StreamRecordSerializer.java:29)
>>         at
>> 
>>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati
>>on
>> Delegate.java:51)
>>         at
>> 
>>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria
>>li
>> zer.addRecord(SpanningRecordSerializer.java:76)
>>         at
>> 
>>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr
>>it
>> er.java:83)
>>         at
>> 
>>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor
>>dW
>> riter.java:58)
>>         at
>> 
>>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu
>>t.
>> java:62)
>>         at
>> 
>>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect
>>or
>> Wrapper.java:40)
>>         at
>> 
>>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou
>>rc
>> e.java:40)
>>         at
>> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at
>> 
>>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java
>>:1
>> 142)
>>         at
>> 
>>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav
>>a:
>> 617)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> 3)
>>
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID: 106
>>         at
>> 
>>com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClas
>>sR
>> esolver.java:119)
>>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>         at 
>>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
>>va
>> :135)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
>>va
>> :21)
>>         at 
>>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
>>ze
>> (KryoSerializer.java:211)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
>>ze
>> (KryoSerializer.java:225)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
>>jo
>> Serializer.java:499)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
>>se
>> rialize(StreamRecordSerializer.java:102)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
>>se
>> rialize(StreamRecordSerializer.java:29)
>>         at
>> 
>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
>>si
>> ngDeserializationDelegate.java:57)
>>         at
>> 
>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
>>nn
>> 
>>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria
>>li
>> zer.java:110)
>>         at
>> 
>>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe
>>xt
>> Record(StreamingAbstractRecordReader.java:80)
>>         at
>> 
>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S
>>tr
>> eamingMutableRecordReader.java:36)
>>         at
>> 
>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato
>>r.
>> java:59)
>>         at
>> 
>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI
>>np
>> utStreamTask.java:68)
>>         at
>> 
>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp
>>ut
>> StreamTask.java:101)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> 4)
>>
>> java.lang.IllegalArgumentException: You can store only Strings, Integer
>> and Longs in the ProtocolDetailMap, not: 'false' for 'null'
>>         at
>> 
>>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100
>>)
>>         at
>> 
>>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
>>va
>> :144)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
>>va
>> :21)
>>         at 
>>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
>>ze
>> (KryoSerializer.java:211)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
>>ze
>> (KryoSerializer.java:225)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
>>jo
>> Serializer.java:499)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
>>se
>> rialize(StreamRecordSerializer.java:102)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
>>se
>> rialize(StreamRecordSerializer.java:29)
>>         at
>> 
>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
>>si
>> ngDeserializationDelegate.java:57)
>>         at
>> 
>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
>>nn
>> 
>>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria
>>li
>> zer.java:110)
>>         at
>> 
>>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe
>>xt
>> Record(StreamingAbstractRecordReader.java:80)
>>         at
>> 
>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S
>>tr
>> eamingMutableRecordReader.java:36)
>>         at
>> 
>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato
>>r.
>> java:59)
>>         at
>> 
>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI
>>np
>> utStreamTask.java:68)
>>         at
>> 
>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp
>>ut
>> StreamTask.java:101)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>> 5)
>>
>>
>> java.lang.IndexOutOfBoundsException: Index: 85, Size: 9
>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>         at java.util.ArrayList.get(ArrayList.java:429)
>>         at
>> 
>>com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefe
>>re
>> nceResolver.java:42)
>>         at
>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>>         at 
>>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
>>va
>> :135)
>>         at
>> 
>>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
>>va
>> :21)
>>         at 
>>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
>>ze
>> (KryoSerializer.java:211)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
>>ze
>> (KryoSerializer.java:225)
>>         at
>> 
>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
>>jo
>> Serializer.java:499)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
>>se
>> rialize(StreamRecordSerializer.java:102)
>>         at
>> 
>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
>>se
>> rialize(StreamRecordSerializer.java:29)
>>         at
>> 
>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
>>si
>> ngDeserializationDelegate.java:57)
>>         at
>> 
>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
>>nn
>> 
>>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria
>>li
>> zer.java:110)
>>         at
>> 
>>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe
>>xt
>> Record(StreamingAbstractRecordReader.java:80)
>>         at
>> 
>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S
>>tr
>> eamingMutableRecordReader.java:36)
>>         at
>> 
>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato
>>r.
>> java:59)
>>         at
>> 
>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI
>>np
>> utStreamTask.java:68)
>>         at
>> 
>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp
>>ut
>> StreamTask.java:101)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> As you can tell, there’s a common theme there which is the MapSerializer
>> in Kryo. The source of my grief seems to be the two java.util.Map
>> implementations ProtocolAttributeMap and ProtocolDetailMap. They’re
>>custom
>> implementations and they’re anal about what types of objects you can
>>have
>> as values.
>>
>>
>> Here’s the output of the gist you asked me to run:
>>
>> class org.apache.flink.api.java.typeutils.PojoTypeInfo : class
>> io.pivotal.rti.protocols.ProtocolEvent
>> (
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
>> java.lang.Long
>> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
>> java.lang.Long
>> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
>> java.lang.Long
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class
>> io.pivotal.rti.protocols.ProtocolAttributeMap
>> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class
>> io.pivotal.rti.protocols.ProtocolDetailMap
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
>> java.lang.Short
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
>> java.lang.Short
>> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
>> java.lang.String
>> )
>>
>>
>> Right now, I’m using the gson library to convert the ProtocolEvent
>> instance to JSON and back. I think I have to write a custom converter to
>> make ProtocolEvent a proper POJO in order for it to work with the
>> serializers in Flink.
>>
>> Sorry for the long email.
>>
>> Thanks,
>> Ali
>>
>>
>> On 2015-11-11, 10:02 AM, "Fabian Hueske" <fhue...@gmail.com> wrote:
>>
>> >Hi Ali,
>> >
>> >I looked into this issue. This problem seems to be caused because the
>> >deserializer reads more data than it should read.
>> >This might happen because of two reasons:
>> >  1) the meta information of how much data is safe to read is
>>incorrect.
>> >  2) the serializer and deserializer logic are not in sync which can
>>cause
>> >the deserializer to read more data than the serializer wrote.
>> >
>> >The first case is less likely: Flink writes the binary length of each
>> >record in front of its serialized representation. This happens whenever
>> >data is sent over the network, regardless of the data type. A bug in
>>this
>> >part would be very crucial, but is also less likely because this
>>happens
>> >very often and has not occurred yet.
>> >
>> >IMO, this looks like an issue of the serialization logic. Looking at
>>your
>> >code, the problem occurs when deserializing ProtocolEvent objects.
>> >Is it possible that you share this class with me?
>> >
>> >If it is not possible to share the class, it would be good, to know the
>> >field types of the Pojo and the associated TypeInformation.
>> >For that you can run the code in this gist [1] which will recursively
>> >print
>> >the field types and their TypeInformation.
>> >
>> >As a temporal workaround, you can try to use Kryo to serialize and
>> >deserialize your Pojos as follows:
>> >ExecutionEnvironment env = ...
>> >env.getConfig().enableForceKryo();
>> >
>> >Best,
>> >Fabian
>> >
>> >[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b
>> >
>> >2015-11-11 10:38 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
>> >
>> >> Hi Ali,
>> >>
>> >> one more thing. Did that error occur once or is it reproducable?
>> >>
>> >> Thanks for your help,
>> >> Fabian
>> >>
>> >> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <u...@apache.org>:
>> >>
>> >>> Hey Ali,
>> >>>
>> >>> thanks for sharing the code. I assume that the custom
>> >>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos.
>>They
>> >>> should not be a problem. I think this is a bug in Flink 0.9.1.
>> >>>
>> >>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8)
>> >>> version and report back?
>> >>>
>> >>> 1) Add
>> >>> 
>>https://repository.apache.org/content/repositories/orgapacheflink-1055
>> >>> as a
>> >>> snapshot repository
>> >>>
>> >>> <repositories>
>> >>> <repository>
>> >>> <id>apache.snapshots</id>
>> >>> <name>Apache Development Snapshot Repository</name>
>> >>> <url>
>> >>> 
>>https://repository.apache.org/content/repositories/orgapacheflink-1055
>> >>> </url>
>> >>> <releases>
>> >>> <enabled>false</enabled>
>> >>> </releases>
>> >>> <snapshots>
>> >>> <enabled>true</enabled>
>> >>> </snapshots>
>> >>> </repository>
>> >>> </repositories>
>> >>>
>> >>> 2) Set the Flink dependency version to 0.10.0
>> >>>
>> >>> 3) Use the Flink binary matching your Hadoop installation from here:
>> >>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use Java,
>> >>>you
>> >>> can go with the Scala 2.10 builds)
>> >>>
>> >>> Sorry for the inconvenience! The release is about to be finished
>>(the
>> >>> voting process is already going on).
>> >>>
>> >>> ­ Ufuk
>> >>>
>> >>>
>> >>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <ali.kash...@emc.com>
>> >>> wrote:
>> >>>
>> >>> > Thanks for the quick reply guys! A lot of interest in this one.
>>I¹ve
>> >>> > attached the source code is attached. There are other supporting
>> >>> > modules/classes but the main flink component is in the included
>>zip
>> >>> file.
>> >>> >
>> >>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right
>>off
>> >>> the
>> >>> > website (flink-0.9.1-bin-hadoop1.tgz).
>> >>> >
>> >>> > In answer to Ufuk¹s question: Yes I¹m using custom data types.
>> >>> >
>> >>> > Thanks,
>> >>> > Ali
>> >>> >
>> >>> >
>> >>> >
>> >>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <u...@apache.org> wrote:
>> >>> >
>> >>> > >Thanks for reporting this. Are you using any custom data types?
>> >>> > >
>> >>> > >If you can share your code, it would be very helpful in order to
>> >>>debug
>> >>> > >this.
>> >>> > >
>> >>> > >­ Ufuk
>> >>> > >
>> >>> > >On Tuesday, 10 November 2015, Fabian Hueske <fhue...@gmail.com>
>> >>>wrote:
>> >>> > >
>> >>> > >> I agree with Robert. Looks like a bug in Flink.
>> >>> > >> Maybe an off-by-one issue (violating index is 32768 and the
>> >>>default
>> >>> > >>memory
>> >>> > >> segment size is 32KB).
>> >>> > >>
>> >>> > >> Which Flink version are you using?
>> >>> > >> In case you are using a custom build, can you share the commit
>>ID
>> >>>(is
>> >>> > >> reported in the first lines of the JobManager log file)?
>> >>> > >>
>> >>> > >> Thanks, Fabian
>> >>> > >>
>> >>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <rmetz...@apache.org
>> >>> > >> <javascript:;>>:
>> >>> > >>
>> >>> > >> > Hi Ali,
>> >>> > >> >
>> >>> > >> > this could be a bug in Flink.
>> >>> > >> > Can you share the code of your program with us to debug the
>> >>>issue?
>> >>> > >> >
>> >>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali
>> >>><ali.kash...@emc.com
>> >>> > >> <javascript:;>> wrote:
>> >>> > >> >
>> >>> > >> > > Hello,
>> >>> > >> > >
>> >>> > >> > > I¹m getting this error while running a streaming module on
>>a
>> >>> cluster
>> >>> > >> of 3
>> >>> > >> > > nodes:
>> >>> > >> > >
>> >>> > >> > >
>> >>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >>
>> >>> 
>>org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti
>>>>>>>ve
>> >>>>>Spa
>> >>> >
>> >>>
>> 
>>>>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptive
>>>>>>>Sp
>> >>>>>ann
>> >>> > >>ingRecordDeserializer.java:214)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti
>>>>>>>ve
>> >>>>>Spa
>> >>> >
>> >>>
>> 
>>>>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(Spilling
>>>>>>>Ad
>> >>>>>apt
>> >>> > >>iveSpanningRecordDeserializer.java:219)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > 
>>>>org.apache.flink.types.StringValue.readString(StringValue.java:764)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial
>>>>>>>iz
>> >>>>>e(S
>> >>> > >>tringSerializer.java:68)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial
>>>>>>>iz
>> >>>>>e(S
>> >>> > >>tringSerializer.java:73)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial
>>>>>>>iz
>> >>>>>e(S
>> >>> > >>tringSerializer.java:28)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deseriali
>>>>>>>ze
>> >>>>>(Po
>> >>> > >>joSerializer.java:499)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer
>>>>>.d
>> >>>e
>> >>> > >>serialize(StreamRecordSerializer.java:102)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer
>>>>>.d
>> >>>e
>> >>> > >>serialize(StreamRecordSerializer.java:29)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea
>>>>>>>d(
>> >>>>>Reu
>> >>> > >>singDeserializationDelegate.java:57)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti
>>>>>>>ve
>> >>>>>Spa
>> >>> >
>> >>>
>> 
>>>>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecord
>>>>>>>De
>> >>>>>ser
>> >>> > >>ializer.java:110)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> >>>>>
>> org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge
>> >>>>>tNe
>> >>> > >>xtRecord(StreamingAbstractRecordReader.java:80)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.n
>>>>>>>ex
>> >>>>>t(S
>> >>> > >>treamingMutableRecordReader.java:36)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt
>>>>>>>er
>> >>>>>ato
>> >>> > >>r.java:59)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext
>>>>>>>(O
>> >>>>>neI
>> >>> > >>nputStreamTask.java:68)
>> >>> > >> > >
>> >>> > >> > > at
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>>
>> 
>>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(O
>>>>>>>ne
>> >>>>>Inp
>> >>> > >>utStreamTask.java:101)
>> >>> > >> > >
>> >>> > >> > > at
>> >>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> >>> > >> > >
>> >>> > >> > > at java.lang.Thread.run(Thread.java:745)
>> >>> > >> > >
>> >>> > >> > >
>> >>> > >> > > Here¹s the configuration for each node:
>> >>> > >> > >
>> >>> > >> > >
>> >>> > >> > > jobmanager.heap.mb: 2048
>> >>> > >> > >
>> >>> > >> > > taskmanager.heap.mb: 4096
>> >>> > >> > >
>> >>> > >> > > taskmanager.numberOfTaskSlots: 5
>> >>> > >> > >
>> >>> > >> > >
>> >>> > >> > > I¹m not even sure where to start with this one so any help
>>is
>> >>> > >> > appreciated.
>> >>> > >> > >
>> >>> > >> > >
>> >>> > >> > > Thanks,
>> >>> > >> > >
>> >>> > >> > > Ali
>> >>> > >> > >
>> >>> > >> >
>> >>> > >>
>> >>> >
>> >>> >
>> >>>
>> >>
>> >>
>>
>>

Reply via email to