Hello fellow Flinksters, I currently work on implementing Stale Synchronous Parallel iterations from the current bulk iterations. I have replacement classes for IterationHeadPactTask, IterationSynchronizationTask and corresponding event handlers to do the job. Among the generated events, I have ClockTaskEvent that inherits from IterationEventWithAggregators and adds an Int member. I have implemented the write and read method accordingly and written serialization tests accordingly, inspired by EventAggregatorsTest.java. The tests pass and everything runs well on a local setup.
Now, when run on a cluster, I encounter the following error: java.io.IOException: io.netty.handler.codec.DecoderException: java.lang.RuntimeException: Error while deserializing event. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.checkError(RemoteInputChannel.java:264) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:76) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.iterative.task.SSPClockSinkTask.readHeadEventChannel(SSPClockSinkTask.java:231) at org.apache.flink.runtime.iterative.task.SSPClockSinkTask.invoke(SSPClockSinkTask.java:125) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221) at java.lang.Thread.run(Thread.java:745) Caused by: io.netty.handler.codec.DecoderException: java.lang.RuntimeException: Error while deserializing event. at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more Caused by: java.lang.RuntimeException: Error while deserializing event. at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:78) at org.apache.flink.runtime.io.network.netty.NettyMessage$TaskEventRequest.readFrom(NettyMessage.java:458) at org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:146) at org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:114) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89) ... 13 more Caused by: java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:141) at org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:130) at org.apache.flink.runtime.iterative.event.IterationEventWithAggregators.read(IterationEventWithAggregators.java:168) at org.apache.flink.runtime.iterative.event.ClockTaskEvent.read(ClockTaskEvent.java:83) at org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromSerializedEvent(EventSerializer.java:73) ... 17 more What am I missing here? Should I register the new event ClockTaskEvent to some serializer somewhere? Also, these lines bother me: at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:117) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:335) Why is it going through the getNextBuffer method since ClockTaskEvent is an event and not a buffer? Thanks and best regards, Tran Nam-Luc