Hello,

I am currently working on the integration of Flink Streaming API to
SAMOA and I have some problems with an exception that I take from the kryo
serialiser:

Caused by: java.lang.ArrayIndexOutOfBoundsException
at java.lang.System.arraycopy(Native Method)
at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:238)
at
org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read(SpillingAdaptiveSpanningRecordDeserializer.java:410)
at
org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:134)
at com.esotericsoftware.kryo.io.Input.require(Input.java:154)
at com.esotericsoftware.kryo.io.Input.readInt(Input.java:303)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:103)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:596)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:707)
at
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:195)


Specifically, I am working with Flink-0.9-SNAPSHOT and the exception is
received on the custom class "FlinkProcessingItem" which extends
"StreamInvokable" class, in "invoke" function when the readNext() function
of StreamInvokable is called.

The object that is supposed to be received by "readNext" function is a
custom Tuple3 object, called SamoaType and defined like this:
"SamoaType extends Tuple3<String, ContentEvent, String>", where
ContentEvent is an interface of SAMOA.

The type information of the custom SamoaType is added to the source in the
following way: "TypeExtractor.getForObject"

The ContentEvent object that's sent between the two Invokables is of type
"InstanceContentEvent" which implements ContentEvent, which you can find in
the following link:
InstanceContentEvent
<https://github.com/yahoo/samoa/blob/master/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/InstanceContentEvent.java>
.

We managed to reproduce the exception in the following test program;
TestSerialization
<https://github.com/senorcarbone/samoa/commit/9eba049031aee85d1bef58dcdaf37110b9fe4505>
.


Lastly, I should mention that the same example runs in Storm, even though
Storm also uses kryo.

Thank you,
Fay

Reply via email to