Ah, no problem.
Glad you could resolve your problem :-)

Thanks for reporting back.

Cheers, Fabian

2015-11-12 17:42 GMT+01:00 Kashmar, Ali <ali.kash...@emc.com>:

> So the problem wasn’t in Flink after all. It turns out the data I was
> receiving at the socket was not complete. So I went back and looked at the
> way I’m sending data to the socket and realized that the socket is closed
> before sending all data. I just needed to flush the stream before closing
> the socket. I don’t see any more serialization errors.
>
> Thanks everyone for the help and I apologize if I wasted your time with
> this. I will stick with 0.9.1 for now but I’ll download and use 0.10 as
> soon as it’s released.
>
> Cheers,
> Ali
>
> On 2015-11-11, 6:00 PM, "Fabian Hueske" <fhue...@gmail.com> wrote:
>
> >Hi Ali,
> >
> >Flink uses different serializers for different data types. For example,
> >(boxed) primitives are serialized using dedicated serializers
> >(IntSerializer, StringSerializer, etc.) and the ProtocolEvent class is
> >recognized as a Pojo type and therefore serialized using Flink's
> >PojoSerializer.
> >Types that cannot be (fully) analyzed are handled as GenericTypes and
> >serialized using Flink's KryoSerializer.
> >
> >By forcing Kryo serialization as I suggested before, Pojo types (such as
> >ProtocolEvent) will be serialized with Kryo instead of Flink's
> >PojoSerializer.
> >Hence, forcing Kryo only affects Pojo types. GenericTypes (such as
> >ProtocolAttributeMap and ProtocolDetailMap) are always handled by Kryo
> >(also without forcing it).
> >
> >The exceptions you are facing might be caused by a bug in the
> >KryoSerializer that we recently fixed (see FLINK-2800 [1]). This bug
> >basically corrupts the stream of serialized data and might very well also
> >be responsible for the original exception you posted. As you see from the
> >JIRA issue, a bug fix was merged to all active branches however it is not
> >yet contained in an official release.
> >
> >I would recommend you to try the latest candidate of the upcoming 0.10
> >release [2] or build Flink from the 0.9-release branch [3].
> >
> >Please let me know if you have any questions or still facing problems when
> >switching to version with a fix for FLINK-2800.
> >
> >Best, Fabian
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-2800
> >[2] http://people.apache.org/~mxm/flink-0.10.0-rc8/
> >[3] https://github.com/apache/flink/tree/release-0.9
> >
> >2015-11-11 17:20 GMT+01:00 Kashmar, Ali <ali.kash...@emc.com>:
> >
> >> Fabian,
> >>
> >> I tried running it again and I noticed there were some more exceptions
> >>in
> >> the log. I fixed those and I don’t see the original error but I do see
> >> other ArrayIndexOutofBoundExceptions in the Kryo serializer code (I
> >>didn’t
> >> even enable that yet like you suggested). Examples:
> >>
> >> 1)
> >>
> >> 10:49:36,331 ERROR org.apache.flink.streaming.api.collector.StreamOutput
> >>       - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException:
> >>255
> >>         at
> >>
> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectI
> >>nt
> >> Map.java:364)
> >>         at
> >>
> >>com.esotericsoftware.kryo.util.MapReferenceResolver.reset(MapReferenceRes
> >>ol
> >> ver.java:47)
> >>         at com.esotericsoftware.kryo.Kryo.reset(Kryo.java:836)
> >>         at
> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:601)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
> >>av
> >> a:95)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
> >>av
> >> a:21)
> >>         at
> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize
> >>(K
> >> ryoSerializer.java:186)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo
> >>Se
> >> rializer.java:372)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
> >>ri
> >> alize(StreamRecordSerializer.java:89)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
> >>ri
> >> alize(StreamRecordSerializer.java:29)
> >>         at
> >>
> >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati
> >>on
> >> Delegate.java:51)
> >>         at
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria
> >>li
> >> zer.addRecord(SpanningRecordSerializer.java:76)
> >>         at
> >>
> >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr
> >>it
> >> er.java:83)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor
> >>dW
> >> riter.java:58)
> >>         at
> >>
> >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu
> >>t.
> >> java:62)
> >>         at
> >>
> >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect
> >>or
> >> Wrapper.java:40)
> >>         at
> >>
> >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou
> >>rc
> >> e.java:40)
> >>         at
> >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
> >>         at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >>         at
> >>
> >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java
> >>:1
> >> 142)
> >>         at
> >>
> >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav
> >>a:
> >> 617)
> >>         at java.lang.Thread.run(Thread.java:745)
> >>
> >>
> >>
> >> 2)
> >> 10:49:36,333 ERROR org.apache.flink.streaming.api.collector.StreamOutput
> >>       - Emit failed due to: java.lang.ArrayIndexOutOfBoundsException:
> >>334
> >>         at
> >>
> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIn
> >>tM
> >> ap.java:207)
> >>         at
> >>
> >>com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectInt
> >>Ma
> >> p.java:117)
> >>         at
> >>
> >>com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapR
> >>ef
> >> erenceResolver.java:23)
> >>         at
> >> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:629)
> >>         at
> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:594)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
> >>av
> >> a:88)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.j
> >>av
> >> a:21)
> >>         at
> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize
> >>(K
> >> ryoSerializer.java:186)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(Pojo
> >>Se
> >> rializer.java:372)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
> >>ri
> >> alize(StreamRecordSerializer.java:89)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.se
> >>ri
> >> alize(StreamRecordSerializer.java:29)
> >>         at
> >>
> >>org.apache.flink.runtime.plugable.SerializationDelegate.write(Serializati
> >>on
> >> Delegate.java:51)
> >>         at
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSeria
> >>li
> >> zer.addRecord(SpanningRecordSerializer.java:76)
> >>         at
> >>
> >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWr
> >>it
> >> er.java:83)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecor
> >>dW
> >> riter.java:58)
> >>         at
> >>
> >>org.apache.flink.streaming.api.collector.StreamOutput.collect(StreamOutpu
> >>t.
> >> java:62)
> >>         at
> >>
> >>org.apache.flink.streaming.api.collector.CollectorWrapper.collect(Collect
> >>or
> >> Wrapper.java:40)
> >>         at
> >>
> >>org.apache.flink.streaming.api.operators.StreamSource$1.collect(StreamSou
> >>rc
> >> e.java:40)
> >>         at
> >> com.emc.ngen.analytics.flink.job.ClientTask.run(ClientTask.java:36)
> >>         at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >>         at
> >>
> >>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java
> >>:1
> >> 142)
> >>         at
> >>
> >>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav
> >>a:
> >> 617)
> >>         at java.lang.Thread.run(Thread.java:745)
> >>
> >> 3)
> >>
> >> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
> >> ID: 106
> >>         at
> >>
> >>com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClas
> >>sR
> >> esolver.java:119)
> >>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>         at
> >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
> >>va
> >> :135)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
> >>va
> >> :21)
> >>         at
> >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
> >>ze
> >> (KryoSerializer.java:211)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
> >>ze
> >> (KryoSerializer.java:225)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
> >>jo
> >> Serializer.java:499)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>se
> >> rialize(StreamRecordSerializer.java:102)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>se
> >> rialize(StreamRecordSerializer.java:29)
> >>         at
> >>
> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
> >>si
> >> ngDeserializationDelegate.java:57)
> >>         at
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nn
> >>
> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria
> >>li
> >> zer.java:110)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe
> >>xt
> >> Record(StreamingAbstractRecordReader.java:80)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S
> >>tr
> >> eamingMutableRecordReader.java:36)
> >>         at
> >>
> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato
> >>r.
> >> java:59)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI
> >>np
> >> utStreamTask.java:68)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp
> >>ut
> >> StreamTask.java:101)
> >>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>         at java.lang.Thread.run(Thread.java:745)
> >>
> >> 4)
> >>
> >> java.lang.IllegalArgumentException: You can store only Strings, Integer
> >> and Longs in the ProtocolDetailMap, not: 'false' for 'null'
> >>         at
> >>
> >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:100
> >>)
> >>         at
> >>
> >>io.pivotal.rti.protocols.ProtocolDetailMap.put(ProtocolDetailMap.java:23)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
> >>va
> >> :144)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
> >>va
> >> :21)
> >>         at
> >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
> >>ze
> >> (KryoSerializer.java:211)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
> >>ze
> >> (KryoSerializer.java:225)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
> >>jo
> >> Serializer.java:499)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>se
> >> rialize(StreamRecordSerializer.java:102)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>se
> >> rialize(StreamRecordSerializer.java:29)
> >>         at
> >>
> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
> >>si
> >> ngDeserializationDelegate.java:57)
> >>         at
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nn
> >>
> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria
> >>li
> >> zer.java:110)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe
> >>xt
> >> Record(StreamingAbstractRecordReader.java:80)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S
> >>tr
> >> eamingMutableRecordReader.java:36)
> >>         at
> >>
> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato
> >>r.
> >> java:59)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI
> >>np
> >> utStreamTask.java:68)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp
> >>ut
> >> StreamTask.java:101)
> >>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>         at java.lang.Thread.run(Thread.java:745)
> >>
> >> 5)
> >>
> >>
> >> java.lang.IndexOutOfBoundsException: Index: 85, Size: 9
> >>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> >>         at java.util.ArrayList.get(ArrayList.java:429)
> >>         at
> >>
> >>com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapRefe
> >>re
> >> nceResolver.java:42)
> >>         at
> >> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> >>         at
> >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
> >>va
> >> :135)
> >>         at
> >>
> >>com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.ja
> >>va
> >> :21)
> >>         at
> >>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
> >>ze
> >> (KryoSerializer.java:211)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deseriali
> >>ze
> >> (KryoSerializer.java:225)
> >>         at
> >>
> >>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(Po
> >>jo
> >> Serializer.java:499)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>se
> >> rialize(StreamRecordSerializer.java:102)
> >>         at
> >>
> >>
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.de
> >>se
> >> rialize(StreamRecordSerializer.java:29)
> >>         at
> >>
> >>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(Reu
> >>si
> >> ngDeserializationDelegate.java:57)
> >>         at
> >>
> >>org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpa
> >>nn
> >>
> >>ingRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeseria
> >>li
> >> zer.java:110)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.getNe
> >>xt
> >> Record(StreamingAbstractRecordReader.java:80)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.next(S
> >>tr
> >> eamingMutableRecordReader.java:36)
> >>         at
> >>
> >>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterato
> >>r.
> >> java:59)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext(OneI
> >>np
> >> utStreamTask.java:68)
> >>         at
> >>
> >>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInp
> >>ut
> >> StreamTask.java:101)
> >>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>         at java.lang.Thread.run(Thread.java:745)
> >>
> >>
> >>
> >> As you can tell, there’s a common theme there which is the MapSerializer
> >> in Kryo. The source of my grief seems to be the two java.util.Map
> >> implementations ProtocolAttributeMap and ProtocolDetailMap. They’re
> >>custom
> >> implementations and they’re anal about what types of objects you can
> >>have
> >> as values.
> >>
> >>
> >> Here’s the output of the gist you asked me to run:
> >>
> >> class org.apache.flink.api.java.typeutils.PojoTypeInfo : class
> >> io.pivotal.rti.protocols.ProtocolEvent
> >> (
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
> >> java.lang.Long
> >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
> >> java.lang.Long
> >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
> >> java.lang.Long
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class
> >> io.pivotal.rti.protocols.ProtocolAttributeMap
> >> class org.apache.flink.api.java.typeutils.GenericTypeInfo : class
> >> io.pivotal.rti.protocols.ProtocolDetailMap
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
> >> java.lang.Short
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> class org.apache.flink.api.common.typeinfo.IntegerTypeInfo : class
> >> java.lang.Short
> >> class org.apache.flink.api.common.typeinfo.BasicTypeInfo : class
> >> java.lang.String
> >> )
> >>
> >>
> >> Right now, I’m using the gson library to convert the ProtocolEvent
> >> instance to JSON and back. I think I have to write a custom converter to
> >> make ProtocolEvent a proper POJO in order for it to work with the
> >> serializers in Flink.
> >>
> >> Sorry for the long email.
> >>
> >> Thanks,
> >> Ali
> >>
> >>
> >> On 2015-11-11, 10:02 AM, "Fabian Hueske" <fhue...@gmail.com> wrote:
> >>
> >> >Hi Ali,
> >> >
> >> >I looked into this issue. This problem seems to be caused because the
> >> >deserializer reads more data than it should read.
> >> >This might happen because of two reasons:
> >> >  1) the meta information of how much data is safe to read is
> >>incorrect.
> >> >  2) the serializer and deserializer logic are not in sync which can
> >>cause
> >> >the deserializer to read more data than the serializer wrote.
> >> >
> >> >The first case is less likely: Flink writes the binary length of each
> >> >record in front of its serialized representation. This happens whenever
> >> >data is sent over the network, regardless of the data type. A bug in
> >>this
> >> >part would be very crucial, but is also less likely because this
> >>happens
> >> >very often and has not occurred yet.
> >> >
> >> >IMO, this looks like an issue of the serialization logic. Looking at
> >>your
> >> >code, the problem occurs when deserializing ProtocolEvent objects.
> >> >Is it possible that you share this class with me?
> >> >
> >> >If it is not possible to share the class, it would be good, to know the
> >> >field types of the Pojo and the associated TypeInformation.
> >> >For that you can run the code in this gist [1] which will recursively
> >> >print
> >> >the field types and their TypeInformation.
> >> >
> >> >As a temporal workaround, you can try to use Kryo to serialize and
> >> >deserialize your Pojos as follows:
> >> >ExecutionEnvironment env = ...
> >> >env.getConfig().enableForceKryo();
> >> >
> >> >Best,
> >> >Fabian
> >> >
> >> >[1] https://gist.github.com/fhueske/6c5aa386fc79ab69712b
> >> >
> >> >2015-11-11 10:38 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
> >> >
> >> >> Hi Ali,
> >> >>
> >> >> one more thing. Did that error occur once or is it reproducable?
> >> >>
> >> >> Thanks for your help,
> >> >> Fabian
> >> >>
> >> >> 2015-11-11 9:50 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> >> >>
> >> >>> Hey Ali,
> >> >>>
> >> >>> thanks for sharing the code. I assume that the custom
> >> >>> ProtocolEvent, ProtocolDetailMap, Subscriber type are all PoJos.
> >>They
> >> >>> should not be a problem. I think this is a bug in Flink 0.9.1.
> >> >>>
> >> >>> Is it possible to re-run your program with the upcoming 0.10.0 (RC8)
> >> >>> version and report back?
> >> >>>
> >> >>> 1) Add
> >> >>>
> >>https://repository.apache.org/content/repositories/orgapacheflink-1055
> >> >>> as a
> >> >>> snapshot repository
> >> >>>
> >> >>> <repositories>
> >> >>> <repository>
> >> >>> <id>apache.snapshots</id>
> >> >>> <name>Apache Development Snapshot Repository</name>
> >> >>> <url>
> >> >>>
> >>https://repository.apache.org/content/repositories/orgapacheflink-1055
> >> >>> </url>
> >> >>> <releases>
> >> >>> <enabled>false</enabled>
> >> >>> </releases>
> >> >>> <snapshots>
> >> >>> <enabled>true</enabled>
> >> >>> </snapshots>
> >> >>> </repository>
> >> >>> </repositories>
> >> >>>
> >> >>> 2) Set the Flink dependency version to 0.10.0
> >> >>>
> >> >>> 3) Use the Flink binary matching your Hadoop installation from here:
> >> >>> http://people.apache.org/~mxm/flink-0.10.0-rc8/ (since you use
> Java,
> >> >>>you
> >> >>> can go with the Scala 2.10 builds)
> >> >>>
> >> >>> Sorry for the inconvenience! The release is about to be finished
> >>(the
> >> >>> voting process is already going on).
> >> >>>
> >> >>> ­ Ufuk
> >> >>>
> >> >>>
> >> >>> On Tue, Nov 10, 2015 at 8:05 PM, Kashmar, Ali <ali.kash...@emc.com>
> >> >>> wrote:
> >> >>>
> >> >>> > Thanks for the quick reply guys! A lot of interest in this one.
> >>I¹ve
> >> >>> > attached the source code is attached. There are other supporting
> >> >>> > modules/classes but the main flink component is in the included
> >>zip
> >> >>> file.
> >> >>> >
> >> >>> > In answer to Fabian¹s question: I¹m using the 0.9.1 release right
> >>off
> >> >>> the
> >> >>> > website (flink-0.9.1-bin-hadoop1.tgz).
> >> >>> >
> >> >>> > In answer to Ufuk¹s question: Yes I¹m using custom data types.
> >> >>> >
> >> >>> > Thanks,
> >> >>> > Ali
> >> >>> >
> >> >>> >
> >> >>> >
> >> >>> > On 2015-11-10, 3:01 PM, "Ufuk Celebi" <u...@apache.org> wrote:
> >> >>> >
> >> >>> > >Thanks for reporting this. Are you using any custom data types?
> >> >>> > >
> >> >>> > >If you can share your code, it would be very helpful in order to
> >> >>>debug
> >> >>> > >this.
> >> >>> > >
> >> >>> > >­ Ufuk
> >> >>> > >
> >> >>> > >On Tuesday, 10 November 2015, Fabian Hueske <fhue...@gmail.com>
> >> >>>wrote:
> >> >>> > >
> >> >>> > >> I agree with Robert. Looks like a bug in Flink.
> >> >>> > >> Maybe an off-by-one issue (violating index is 32768 and the
> >> >>>default
> >> >>> > >>memory
> >> >>> > >> segment size is 32KB).
> >> >>> > >>
> >> >>> > >> Which Flink version are you using?
> >> >>> > >> In case you are using a custom build, can you share the commit
> >>ID
> >> >>>(is
> >> >>> > >> reported in the first lines of the JobManager log file)?
> >> >>> > >>
> >> >>> > >> Thanks, Fabian
> >> >>> > >>
> >> >>> > >> 2015-11-10 18:29 GMT+01:00 Robert Metzger <rmetz...@apache.org
> >> >>> > >> <javascript:;>>:
> >> >>> > >>
> >> >>> > >> > Hi Ali,
> >> >>> > >> >
> >> >>> > >> > this could be a bug in Flink.
> >> >>> > >> > Can you share the code of your program with us to debug the
> >> >>>issue?
> >> >>> > >> >
> >> >>> > >> > On Tue, Nov 10, 2015 at 6:25 PM, Kashmar, Ali
> >> >>><ali.kash...@emc.com
> >> >>> > >> <javascript:;>> wrote:
> >> >>> > >> >
> >> >>> > >> > > Hello,
> >> >>> > >> > >
> >> >>> > >> > > I¹m getting this error while running a streaming module on
> >>a
> >> >>> cluster
> >> >>> > >> of 3
> >> >>> > >> > > nodes:
> >> >>> > >> > >
> >> >>> > >> > >
> >> >>> > >> > > java.lang.ArrayIndexOutOfBoundsException: 32768
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >>
> >> >>>
> >>org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:178)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti
> >>>>>>>ve
> >> >>>>>Spa
> >> >>> >
> >> >>>
> >>
> >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptive
> >>>>>>>Sp
> >> >>>>>ann
> >> >>> > >>ingRecordDeserializer.java:214)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti
> >>>>>>>ve
> >> >>>>>Spa
> >> >>> >
> >> >>>
> >>
> >>>>>>>nningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(Spilling
> >>>>>>>Ad
> >> >>>>>apt
> >> >>> > >>iveSpanningRecordDeserializer.java:219)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> >
> >>>>org.apache.flink.types.StringValue.readString(StringValue.java:764)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial
> >>>>>>>iz
> >> >>>>>e(S
> >> >>> > >>tringSerializer.java:68)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial
> >>>>>>>iz
> >> >>>>>e(S
> >> >>> > >>tringSerializer.java:73)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.api.common.typeutils.base.StringSerializer.deserial
> >>>>>>>iz
> >> >>>>>e(S
> >> >>> > >>tringSerializer.java:28)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deseriali
> >>>>>>>ze
> >> >>>>>(Po
> >> >>> > >>joSerializer.java:499)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer
> >>>>>.d
> >> >>>e
> >> >>> > >>serialize(StreamRecordSerializer.java:102)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer
> >>>>>.d
> >> >>>e
> >> >>> > >>serialize(StreamRecordSerializer.java:29)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea
> >>>>>>>d(
> >> >>>>>Reu
> >> >>> > >>singDeserializationDelegate.java:57)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti
> >>>>>>>ve
> >> >>>>>Spa
> >> >>> >
> >> >>>
> >>
> >>>>>>>nningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecord
> >>>>>>>De
> >> >>>>>ser
> >> >>> > >>ializer.java:110)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >> >>>>>
> >> org.apache.flink.streaming.runtime.io.StreamingAbstractRecordReader.ge
> >> >>>>>tNe
> >> >>> > >>xtRecord(StreamingAbstractRecordReader.java:80)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.streaming.runtime.io.StreamingMutableRecordReader.n
> >>>>>>>ex
> >> >>>>>t(S
> >> >>> > >>treamingMutableRecordReader.java:36)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt
> >>>>>>>er
> >> >>>>>ato
> >> >>> > >>r.java:59)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.readNext
> >>>>>>>(O
> >> >>>>>neI
> >> >>> > >>nputStreamTask.java:68)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>>
> >>
> >>>>>>>org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(O
> >>>>>>>ne
> >> >>>>>Inp
> >> >>> > >>utStreamTask.java:101)
> >> >>> > >> > >
> >> >>> > >> > > at
> >> >>>org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >> >>> > >> > >
> >> >>> > >> > > at java.lang.Thread.run(Thread.java:745)
> >> >>> > >> > >
> >> >>> > >> > >
> >> >>> > >> > > Here¹s the configuration for each node:
> >> >>> > >> > >
> >> >>> > >> > >
> >> >>> > >> > > jobmanager.heap.mb: 2048
> >> >>> > >> > >
> >> >>> > >> > > taskmanager.heap.mb: 4096
> >> >>> > >> > >
> >> >>> > >> > > taskmanager.numberOfTaskSlots: 5
> >> >>> > >> > >
> >> >>> > >> > >
> >> >>> > >> > > I¹m not even sure where to start with this one so any help
> >>is
> >> >>> > >> > appreciated.
> >> >>> > >> > >
> >> >>> > >> > >
> >> >>> > >> > > Thanks,
> >> >>> > >> > >
> >> >>> > >> > > Ali
> >> >>> > >> > >
> >> >>> > >> >
> >> >>> > >>
> >> >>> >
> >> >>> >
> >> >>>
> >> >>
> >> >>
> >>
> >>
>
>

Reply via email to