Hey Tran Nam-Luc, You don't have to register with a serializer. Can you share the event code? I will look into it asap.
The runtime is buffer oriented and events arrive as buffers before they are deserialized. That's why you see the getNextBuffer call in the stack trace. – Ufuk On Tuesday, June 23, 2015, Nam-Luc Tran <namluc.t...@euranova.eu> wrote: > Hello fellow Flinksters, > > I currently work on implementing Stale Synchronous Parallel iterations > from the current bulk iterations. I have replacement classes for > IterationHeadPactTask, IterationSynchronizationTask and corresponding > event handlers to do the job. Among the generated events, I have > ClockTaskEvent that inherits from IterationEventWithAggregators and > adds an Int member. I have implemented the write and read method > accordingly and written serialization tests accordingly, inspired by > EventAggregatorsTest.java. The tests pass and everything runs well on > a local setup. > > Now, when run on a cluster, I encounter the following error: > > java.io.IOException: io.netty.handler.codec.DecoderException: > java.lang.RuntimeException: Error while deserializing event. > at > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkError(RemoteInputChannel.java:264) > at > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117) > at > > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335) > at > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76) > at > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > > org.apache.flink.runtime.iterative.task.SSPClockSinkTask.readHeadEventChannel(SSPClockSinkTask.java:231) > at > > org.apache.flink.runtime.iterative.task.SSPClockSinkTask.invoke(SSPClockSinkTask.java:125) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221) > at java.lang.Thread.run(Thread.java:745) > Caused by: io.netty.handler.codec.DecoderException: > java.lang.RuntimeException: Error while deserializing event. > at > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99) > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > at > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > ... 1 more > Caused by: java.lang.RuntimeException: Error while deserializing > event. > at > > org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:78) > at > > org.apache.flink.runtime.io.network.netty.NettyMessage$TaskEventRequest.readFrom(NettyMessage.java:458) > at > > org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:146) > at > > org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:114) > at > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89) > ... 13 more > Caused by: java.io.EOFException > at > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:141) > at > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:130) > at > > org.apache.flink.runtime.iterative.event.IterationEventWithAggregators.read(IterationEventWithAggregators.java:168) > at > > org.apache.flink.runtime.iterative.event.ClockTaskEvent.read(ClockTaskEvent.java:83) > at > > org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:73) > ... 17 more > > What am I missing here? Should I register the new event ClockTaskEvent > to some serializer somewhere? Also, these lines bother me: > at > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117) > at > > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335) > > Why is it going through the getNextBuffer method since ClockTaskEvent > is an event and not a buffer? > > Thanks and best regards, > > Tran Nam-Luc > > >